helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [08/17] [HELIX-395] Remove old Helix alert/stat modules
Date Fri, 11 Jul 2014 19:58:03 GMT
http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
deleted file mode 100644
index 2fd95e9..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
+++ /dev/null
@@ -1,494 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.helix.HelixException;
-import org.apache.log4j.Logger;
-
-public class ExpressionParser {
-  private static Logger logger = Logger.getLogger(ExpressionParser.class);
-
-  final static String opDelim = "|";
-  final static String opDelimForSplit = "\\|";
-  final static String argDelim = ",";
-  final public static String statFieldDelim = ".";
-  final static String wildcardChar = "*";
-
-  // static Map<String, ExpressionOperatorType> operatorMap = new
-  // HashMap<String, ExpressionOperatorType>();
-
-  static Map<String, Operator> operatorMap = new HashMap<String, Operator>();
-  static Map<String, Aggregator> aggregatorMap = new HashMap<String, Aggregator>();
-
-  static {
-
-    addOperatorEntry("EXPAND", new ExpandOperator());
-    addOperatorEntry("DIVIDE", new DivideOperator());
-    addOperatorEntry("SUM", new SumOperator());
-    addOperatorEntry("SUMEACH", new SumEachOperator());
-
-    addAggregatorEntry("ACCUMULATE", new AccumulateAggregator());
-    addAggregatorEntry("DECAY", new DecayAggregator());
-    addAggregatorEntry("WINDOW", new WindowAggregator());
-    /*
-     * addEntry("EACH", ExpressionOperatorType.EACH); addEntry("SUM",
-     * ExpressionOperatorType.SUM); addEntry("DIVIDE",
-     * ExpressionOperatorType.DIVIDE); addEntry("ACCUMULATE",
-     * ExpressionOperatorType.ACCUMULATE);
-     */
-  }
-
-  // static Pattern pattern = Pattern.compile("(\\{.+?\\})");
-
-  private static void addOperatorEntry(String label, Operator op) {
-    if (!operatorMap.containsKey(label)) {
-      operatorMap.put(label, op);
-    }
-    logger.info("Adding operator: " + op);
-  }
-
-  private static void addAggregatorEntry(String label, Aggregator agg) {
-    if (!aggregatorMap.containsKey(label.toUpperCase())) {
-      aggregatorMap.put(label.toUpperCase(), agg);
-    }
-    logger.info("Adding aggregator: " + agg);
-  }
-
-  /*
-   * private static void addEntry(String label, ExpressionOperatorType type) {
-   * if (!operatorMap.containsKey(label)) { operatorMap.put(label, type); }
-   * logger.info("Adding operator type: "+type); }
-   */
-
-  public static boolean isExpressionNested(String expression) {
-    return expression.contains("(");
-  }
-
-  /*
-   * public static Operator getOperatorType(String expression) throws Exception
-   * { String op = expression.substring(0,expression.indexOf("(")); if
-   * (!operatorMap.containsKey(op)) { throw new
-   * Exception(op+" is not a valid op type"); } return operatorMap.get(op); }
-   */
-
-  public static String getInnerExpression(String expression) {
-    return expression.substring(expression.indexOf("(") + 1, expression.lastIndexOf(")"));
-  }
-
-  /*
-   * public static String[] getBaseStats(ExpressionOperatorType type, String
-   * expression) throws Exception { String[] items = null; if
-   * (isExpressionNested(expression)) { ExpressionOperatorType nextType =
-   * getOperatorType(expression); String innerExp =
-   * getInnerExpression(expression); items = getBaseStats(nextType, innerExp); }
-   * else { //base class, no nesting items = expression.split(","); }
-   * if (type != null && type.isBaseOp()) { //surround items with type. for (int
-   * i=0; i<items.length; i++) { items[i] = type + "(" + items[i] + ")"; //!!!!
-   * NEED type to behave like string here
-   * logger.debug("Forming item "+items[i]); } } return items; }
-   * public static String[] getBaseStats(String expression) throws Exception {
-   * expression = expression.replaceAll("\\s+", ""); return getBaseStats(null,
-   * expression); }
-   */
-
-  /*
-   * Validate 2 sets of parenthesis exist, all before first opDelim
-   * extract agg type and validate it exists. validate number of args passed in
-   */
-  public static void validateAggregatorFormat(String expression) throws HelixException {
-    logger.debug("validating aggregator for expression: " + expression);
-    // have 0 or more args, 1 or more stats...e.g. ()(x) or (2)(x,y)
-    Pattern pattern = Pattern.compile("\\(.*?\\)");
-    Matcher matcher = pattern.matcher(expression);
-    String aggComponent = null;
-    String statComponent = null;
-    int lastMatchEnd = -1;
-    if (matcher.find()) {
-      aggComponent = matcher.group();
-      aggComponent = aggComponent.substring(1, aggComponent.length() - 1);
-      if (aggComponent.contains(")") || aggComponent.contains("(")) {
-        throw new HelixException(expression + " has invalid aggregate component");
-      }
-    } else {
-      throw new HelixException(expression + " has invalid aggregate component");
-    }
-    if (matcher.find()) {
-      statComponent = matcher.group();
-      statComponent = statComponent.substring(1, statComponent.length() - 1);
-      // statComponent must have at least 1 arg between paren
-      if (statComponent.contains(")") || statComponent.contains("(") || statComponent.length() == 0) {
-        throw new HelixException(expression + " has invalid stat component");
-      }
-      lastMatchEnd = matcher.end();
-    } else {
-      throw new HelixException(expression + " has invalid stat component");
-    }
-    if (matcher.find()) {
-      throw new HelixException(expression + " has too many parenthesis components");
-    }
-
-    if (expression.length() >= lastMatchEnd + 1) { // lastMatchEnd is pos 1 past the pattern. check
-                                                   // if there are paren there
-      if (expression.substring(lastMatchEnd).contains("(")
-          || expression.substring(lastMatchEnd).contains(")")) {
-        throw new HelixException(expression + " has extra parenthesis");
-      }
-    }
-
-    // check wildcard locations. each part can have at most 1 wildcard, and must
-    // be at end
-    // String expStatNamePart = expression.substring(expression.)
-    StringTokenizer fieldTok = new StringTokenizer(statComponent, statFieldDelim);
-    while (fieldTok.hasMoreTokens()) {
-      String currTok = fieldTok.nextToken();
-      if (currTok.contains(wildcardChar)) {
-        if (currTok.indexOf(wildcardChar) != currTok.length() - 1
-            || currTok.lastIndexOf(wildcardChar) != currTok.length() - 1) {
-          throw new HelixException(currTok
-              + " is illegal stat name.  Single wildcard must appear at end.");
-        }
-      }
-    }
-  }
-
-  public static boolean statContainsWildcards(String stat) {
-    return stat.contains(wildcardChar);
-  }
-
-  /*
-   * Return true if stat name matches exactly...incomingStat has no agg type
-   * currentStat can have any
-   * Function can match for 2 cases extractStatFromAgg=false. Match
-   * accumulate()(dbFoo.partition10.latency) with
-   * accumulate()(dbFoo.partition10.latency)...trival extractStatFromAgg=true.
-   * Match accumulate()(dbFoo.partition10.latency) with
-   * dbFoo.partition10.latency
-   */
-  public static boolean isExactMatch(String currentStat, String incomingStat,
-      boolean extractStatFromAgg) {
-    String currentStatName = currentStat;
-    if (extractStatFromAgg) {
-      currentStatName = getSingleAggregatorStat(currentStat);
-    }
-    return (incomingStat.equals(currentStatName));
-  }
-
-  /*
-   * Return true if incomingStat matches wildcardStat except currentStat has 1+
-   * fields with "*" a*.c* matches a5.c7 a*.c* does not match a5.b6.c7
-   * Function can match for 2 cases extractStatFromAgg=false. Match
-   * accumulate()(dbFoo.partition*.latency) with
-   * accumulate()(dbFoo.partition10.latency) extractStatFromAgg=true. Match
-   * accumulate()(dbFoo.partition*.latency) with dbFoo.partition10.latency
-   */
-  public static boolean isWildcardMatch(String currentStat, String incomingStat,
-      boolean statCompareOnly, ArrayList<String> bindings) {
-    if (!statCompareOnly) { // need to check for match on agg type and stat
-      String currentStatAggType = (currentStat.split("\\)"))[0];
-      String incomingStatAggType = (incomingStat.split("\\)"))[0];
-      if (!currentStatAggType.equals(incomingStatAggType)) {
-        return false;
-      }
-    }
-    // now just get the stats
-    String currentStatName = getSingleAggregatorStat(currentStat);
-    String incomingStatName = getSingleAggregatorStat(incomingStat);
-
-    if (!currentStatName.contains(wildcardChar)) { // no wildcards in stat name
-      return false;
-    }
-
-    String currentStatNamePattern = currentStatName.replace(".", "\\.");
-    currentStatNamePattern = currentStatNamePattern.replace("*", ".*");
-    boolean result = Pattern.matches(currentStatNamePattern, incomingStatName);
-    if (result && bindings != null) {
-      bindings.add(incomingStatName);
-    }
-    return result;
-    /*
-     * StringTokenizer currentStatTok = new StringTokenizer(currentStatName,
-     * statFieldDelim);
-     * StringTokenizer incomingStatTok = new StringTokenizer(incomingStatName,
-     * statFieldDelim);
-     * if (currentStatTok.countTokens() != incomingStatTok.countTokens())
-     * { // stat names different numbers of fields
-     * return false;
-     * }
-     * // for each token, if not wildcarded, must be an exact match
-     * while (currentStatTok.hasMoreTokens())
-     * {
-     * String currTok = currentStatTok.nextToken();
-     * String incomingTok = incomingStatTok.nextToken();
-     * logger.debug("curTok: " + currTok);
-     * logger.debug("incomingTok: " + incomingTok);
-     * if (!currTok.contains(wildcardChar))
-     * { // no wildcard, but have exact match
-     * if (!currTok.equals(incomingTok))
-     * { // not exact match
-     * return false;
-     * }
-     * }
-     * else
-     * { // currTok has a wildcard
-     * if (currTok.indexOf(wildcardChar) != currTok.length() - 1
-     * || currTok.lastIndexOf(wildcardChar) != currTok.length() - 1)
-     * {
-     * throw new HelixException(currTok
-     * + " is illegal stat name.  Single wildcard must appear at end.");
-     * }
-     * // for wildcard matching, need to escape parentheses on currTok, so
-     * // regex works
-     * // currTok = currTok.replace("(", "\\(");
-     * // currTok = currTok.replace(")", "\\)");
-     * // incomingTok = incomingTok.replace("(", "\\(");
-     * // incomingTok = incomingTok.replace(")", "\\)");
-     * String currTokPreWildcard = currTok.substring(0, currTok.length() - 1);
-     * // TODO: if current token has a "(" in it, pattern compiling throws
-     * // error
-     * // Pattern pattern = Pattern.compile(currTokPreWildcard+".+"); //form
-     * // pattern...wildcard part can be anything
-     * // Matcher matcher = pattern.matcher(incomingTok); //see if incomingTok
-     * // matches
-     * if (incomingTok.indexOf(currTokPreWildcard) != 0)
-     * {
-     * // if (!matcher.find()) { //no match on one tok, return false
-     * return false;
-     * }
-     * // get the binding
-     * if (bindings != null)
-     * {
-     * // TODO: debug me!
-     * String wildcardBinding = incomingTok.substring(incomingTok
-     * .indexOf(currTokPreWildcard) + currTokPreWildcard.length());
-     * bindings.add(wildcardBinding);
-     * }
-     * }
-     * }
-     * // all fields match or wildcard match...return true!
-     * return true;
-     */
-  }
-
-  /*
-   * For checking if an incoming stat (no agg type defined) matches a persisted
-   * stat (with agg type defined)
-   */
-  public static boolean isIncomingStatExactMatch(String currentStat, String incomingStat) {
-    return isExactMatch(currentStat, incomingStat, true);
-  }
-
-  /*
-   * For checking if an incoming stat (no agg type defined) wildcard matches a
-   * persisted stat (with agg type defined) The persisted stat may have
-   * wildcards
-   */
-  public static boolean isIncomingStatWildcardMatch(String currentStat, String incomingStat) {
-    return isWildcardMatch(currentStat, incomingStat, true, null);
-  }
-
-  /*
-   * For checking if a persisted stat matches a stat defined in an alert
-   */
-  public static boolean isAlertStatExactMatch(String alertStat, String currentStat) {
-    return isExactMatch(alertStat, currentStat, false);
-  }
-
-  /*
-   * For checking if a maintained stat wildcard matches a stat defined in an
-   * alert. The alert may have wildcards
-   */
-  public static boolean isAlertStatWildcardMatch(String alertStat, String currentStat,
-      ArrayList<String> wildcardBindings) {
-    return isWildcardMatch(alertStat, currentStat, false, wildcardBindings);
-  }
-
-  public static Aggregator getAggregator(String aggStr) throws HelixException {
-    aggStr = aggStr.toUpperCase();
-    Aggregator agg = aggregatorMap.get(aggStr);
-    if (agg == null) {
-      throw new HelixException("Unknown aggregator type " + aggStr);
-    }
-    return agg;
-  }
-
-  public static String getAggregatorStr(String expression) throws HelixException {
-    if (!expression.contains("(")) {
-      throw new HelixException(expression
-          + " does not contain a valid aggregator.  No parentheses found");
-    }
-    String aggName = expression.substring(0, expression.indexOf("("));
-    if (!aggregatorMap.containsKey(aggName.toUpperCase())) {
-      throw new HelixException("aggregator <" + aggName + "> is unknown type");
-    }
-    return aggName;
-  }
-
-  public static String[] getAggregatorArgs(String expression) throws HelixException {
-    String aggregator = getAggregatorStr(expression);
-    String argsStr = getAggregatorArgsStr(expression);
-    String[] args = argsStr.split(argDelim);
-    logger.debug("args size: " + args.length);
-    int numArgs = (argsStr.length() == 0) ? 0 : args.length;
-    // String[] argList = (expression.substring(expression.indexOf("(")+1,
-    // expression.indexOf(")"))).split(argDelim);
-    // verify correct number of args
-    int requiredNumArgs = aggregatorMap.get(aggregator.toUpperCase()).getRequiredNumArgs();
-    if (numArgs != requiredNumArgs) {
-      throw new HelixException(expression + " contains " + args.length
-          + " arguments, but requires " + requiredNumArgs);
-    }
-    return args;
-  }
-
-  /*
-   * public static String[] getAggregatorArgsList(String expression) { String
-   * argsStr = getAggregatorArgsStr(expression); String[] args =
-   * argsStr.split(argDelim); return args; }
-   */
-
-  public static String getAggregatorArgsStr(String expression) {
-    return expression.substring(expression.indexOf("(") + 1, expression.indexOf(")"));
-  }
-
-  public static String[] getAggregatorStats(String expression) throws HelixException {
-    String justStats = expression;
-    if (expression.contains("(") && expression.contains(")")) {
-      justStats =
-          (expression.substring(expression.lastIndexOf("(") + 1, expression.lastIndexOf(")")));
-    }
-    String[] statList = justStats.split(argDelim);
-    if (statList.length < 1) {
-      throw new HelixException(expression + " does not contain any aggregator stats");
-    }
-    return statList;
-  }
-
-  public static String getSingleAggregatorStat(String expression) throws HelixException {
-    String[] stats = getAggregatorStats(expression);
-    if (stats.length > 1) {
-      throw new HelixException(expression + " contains more than 1 stat");
-    }
-    return stats[0];
-  }
-
-  public static String getWildcardStatSubstitution(String wildcardStat, String fixedStat) {
-    int lastOpenParenLoc = wildcardStat.lastIndexOf("(");
-    int lastCloseParenLoc = wildcardStat.lastIndexOf(")");
-    StringBuilder builder = new StringBuilder();
-    builder.append(wildcardStat.substring(0, lastOpenParenLoc + 1));
-    builder.append(fixedStat);
-    builder.append(")");
-    logger.debug("wildcardStat: " + wildcardStat);
-    logger.debug("fixedStat: " + fixedStat);
-    logger.debug("subbedStat: " + builder.toString());
-    return builder.toString();
-  }
-
-  // XXX: each op type should have number of inputs, number of outputs. do
-  // validation.
-  // (dbFoo.partition*.latency, dbFoo.partition*.count)|EACH|ACCUMULATE|DIVIDE
-  public static String[] getBaseStats(String expression) throws HelixException {
-    expression = expression.replaceAll("\\s+", "");
-    validateAggregatorFormat(expression);
-
-    String aggName = getAggregatorStr(expression);
-    String[] aggArgs = getAggregatorArgs(expression);
-    String[] aggStats = getAggregatorStats(expression);
-
-    // form aggArgs
-    String aggArgList = getAggregatorArgsStr(expression);
-
-    String[] baseStats = new String[aggStats.length];
-    for (int i = 0; i < aggStats.length; i++) {
-      StringBuilder stat = new StringBuilder();
-      stat.append(aggName);
-      stat.append("(");
-      stat.append(aggArgList);
-      stat.append(")");
-      stat.append("(");
-      stat.append(aggStats[i]);
-      stat.append(")");
-      baseStats[i] = stat.toString();
-    }
-    return baseStats;
-  }
-
-  public static String[] getOperators(String expression) throws HelixException {
-    String[] ops = null;
-    int numAggStats = (getAggregatorStats(expression)).length;
-    int opDelimLoc = expression.indexOf(opDelim);
-    if (opDelimLoc < 0) {
-      return null;
-    }
-    logger.debug("ops str: " + expression.substring(opDelimLoc + 1));
-    ops = expression.substring(opDelimLoc + 1).split(opDelimForSplit);
-
-    // validate this string of ops
-    // verify each op exists
-    // take num input tuples sets and verify ops will output exactly 1 tuple
-    // sets
-    int currNumTuples = numAggStats;
-    for (String op : ops) {
-      logger.debug("op: " + op);
-      if (!operatorMap.containsKey(op.toUpperCase())) {
-        throw new HelixException("<" + op + "> is not a valid operator type");
-      }
-      Operator currOpType = operatorMap.get(op.toUpperCase());
-      if (currNumTuples < currOpType.minInputTupleLists
-          || currNumTuples > currOpType.maxInputTupleLists) {
-        throw new HelixException("<" + op + "> cannot process " + currNumTuples + " input tuples");
-      }
-      // reset num tuples to this op's output size
-      if (!currOpType.inputOutputTupleListsCountsEqual) { // if equal, this number does not change
-        currNumTuples = currOpType.numOutputTupleLists;
-      }
-    }
-    if (currNumTuples != 1) {
-      throw new HelixException(expression + " does not terminate in a single tuple set");
-    }
-    return ops;
-  }
-
-  public static void validateOperators(String expression) throws HelixException {
-    getOperators(expression);
-  }
-
-  public static Operator getOperator(String opName) throws HelixException {
-    if (!operatorMap.containsKey(opName)) {
-      throw new HelixException(opName + " is unknown op type");
-    }
-    return operatorMap.get(opName);
-  }
-
-  public static void validateExpression(String expression) throws HelixException {
-    // 1. extract stats part and validate
-    validateAggregatorFormat(expression);
-    // 2. extract ops part and validate the ops exist and the inputs/outputs are
-    // correct
-    validateOperators(expression);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java b/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
deleted file mode 100644
index 0e9c8f1..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-
-public class GreaterAlertComparator extends AlertComparator {
-
-  @Override
-  /*
-   * Returns true if any element left tuple exceeds any element in right tuple
-   */
-  public boolean evaluate(Tuple<String> leftTup, Tuple<String> rightTup) {
-    Iterator<String> leftIter = leftTup.iterator();
-    while (leftIter.hasNext()) {
-      double leftVal = Double.parseDouble(leftIter.next());
-      Iterator<String> rightIter = rightTup.iterator();
-      while (rightIter.hasNext()) {
-        double rightVal = Double.parseDouble(rightIter.next());
-        if (leftVal > rightVal) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
deleted file mode 100644
index 74a4688..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class MultiplyOperator extends Operator {
-
-  public MultiplyOperator() {
-    minInputTupleLists = 1;
-    maxInputTupleLists = Integer.MAX_VALUE;
-    inputOutputTupleListsCountsEqual = false;
-    numOutputTupleLists = 1;
-  }
-
-  public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) {
-    List out = new ArrayList();
-    out.add(input.iterator());
-    return out;
-  }
-
-  @Override
-  public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
-    ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
-    if (input == null || input.size() == 0) {
-      return singleSetToIter(output);
-    }
-    while (true) { // loop through set of iters, return when 1 runs out (not completing the row in
-                   // progress)
-      Tuple<String> rowProduct = null;
-      for (Iterator<Tuple<String>> it : input) {
-        if (!it.hasNext()) { // when any iterator runs out, we are done
-          return singleSetToIter(output);
-        }
-        rowProduct = multiplyTuples(rowProduct, it.next());
-      }
-      output.add(rowProduct);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java b/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
deleted file mode 100644
index 0612cf3..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-import java.util.List;
-
-public abstract class Operator {
-
-  public int minInputTupleLists;
-  public int maxInputTupleLists;
-  public int numOutputTupleLists = -1;
-  public boolean inputOutputTupleListsCountsEqual = false;
-
-  public Operator() {
-
-  }
-
-  public Tuple<String> multiplyTuples(Tuple<String> tup1, Tuple<String> tup2) {
-    if (tup1 == null) {
-      return tup2;
-    }
-    if (tup2 == null) {
-      return tup1;
-    }
-    Tuple<String> outputTup = new Tuple<String>();
-
-    // sum staggers if the tuples are same length
-    // e.g. 1,2,3 + 4,5 = 1,6,8
-    // so this is a bit tricky
-    Tuple<String> largerTup;
-    Tuple<String> smallerTup;
-    if (tup1.size() >= tup2.size()) {
-      largerTup = tup1;
-      smallerTup = tup2;
-    } else {
-      largerTup = tup2;
-      smallerTup = tup1;
-    }
-    int gap = largerTup.size() - smallerTup.size();
-
-    for (int i = 0; i < largerTup.size(); i++) {
-      if (i < gap) {
-        outputTup.add(largerTup.getElement(i));
-      } else {
-        double elementProduct = 0;
-        elementProduct =
-            Double.parseDouble(largerTup.getElement(i))
-                * Double.parseDouble(smallerTup.getElement(i - gap));
-        outputTup.add(String.valueOf(elementProduct));
-      }
-    }
-    return outputTup;
-  }
-
-  public Tuple<String> sumTuples(Tuple<String> tup1, Tuple<String> tup2) {
-    if (tup1 == null) {
-      return tup2;
-    }
-    if (tup2 == null) {
-      return tup1;
-    }
-    Tuple<String> outputTup = new Tuple<String>();
-
-    // sum staggers if the tuples are same length
-    // e.g. 1,2,3 + 4,5 = 1,6,8
-    // so this is a bit tricky
-    Tuple<String> largerTup;
-    Tuple<String> smallerTup;
-    if (tup1.size() >= tup2.size()) {
-      largerTup = tup1;
-      smallerTup = tup2;
-    } else {
-      largerTup = tup2;
-      smallerTup = tup1;
-    }
-    int gap = largerTup.size() - smallerTup.size();
-
-    for (int i = 0; i < largerTup.size(); i++) {
-      if (i < gap) {
-        outputTup.add(largerTup.getElement(i));
-      } else {
-        double elementSum = 0;
-        elementSum =
-            Double.parseDouble(largerTup.getElement(i))
-                + Double.parseDouble(smallerTup.getElement(i - gap));
-        outputTup.add(String.valueOf(elementSum));
-      }
-    }
-    return outputTup;
-  }
-
-  public abstract List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input);
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java b/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
deleted file mode 100644
index 6895128..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class Stat {
-  String _name;
-  Tuple<String> _value;
-  Tuple<String> _timestamp;
-
-  public Stat(String name, Tuple<String> value, Tuple<String> timestamp) {
-    _name = name;
-    _value = value;
-    _timestamp = timestamp;
-  }
-
-  public String getName() {
-    return _name;
-  }
-
-  public Tuple<String> getValue() {
-    return _value;
-  }
-
-  public Tuple<String> getTimestamp() {
-    return _timestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java b/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
deleted file mode 100644
index 1538eb8..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
+++ /dev/null
@@ -1,306 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.stages.HealthDataCache;
-import org.apache.helix.model.PersistentStats;
-import org.apache.log4j.Logger;
-
-public class StatsHolder {
-  enum MatchResult {
-    WILDCARDMATCH,
-    EXACTMATCH,
-    NOMATCH
-  };
-
-  private static final Logger logger = Logger.getLogger(StatsHolder.class.getName());
-
-  public static final String VALUE_NAME = "value";
-  public static final String TIMESTAMP_NAME = "TimeStamp";
-
-  HelixDataAccessor _accessor;
-  HealthDataCache _cache;
-
-  Map<String, Map<String, String>> _statMap;
-  Map<String, Map<String, MatchResult>> _statAlertMatchResult;
-
-  private Builder _keyBuilder;
-
-  // PersistentStats _persistentStats;
-
-  public StatsHolder(HelixManager manager, HealthDataCache cache) {
-    _accessor = manager.getHelixDataAccessor();
-    _cache = cache;
-    _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
-    updateCache(_cache);
-    _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
-
-  }
-
-  public void refreshStats() {
-    logger.info("Refreshing cached stats");
-    _cache.refresh(_accessor);
-    updateCache(_cache);
-  }
-
-  public void persistStats() {
-    // XXX: Am I using _accessor too directly here?
-    // took around 35 ms from desktop to ESV4 machine
-    PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-    if (stats == null) {
-      stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
-      // this record, if it
-      // matters
-    }
-    stats.getRecord().setMapFields(_statMap);
-    boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(), stats);
-  }
-
-  public void getStatsFromCache(boolean refresh) {
-    long refreshStartTime = System.currentTimeMillis();
-    if (refresh) {
-      _cache.refresh(_accessor);
-    }
-    PersistentStats persistentStatRecord = _cache.getPersistentStats();
-    if (persistentStatRecord != null) {
-      _statMap = persistentStatRecord.getMapFields();
-    } else {
-      _statMap = new HashMap<String, Map<String, String>>();
-    }
-    /*
-     * if (_cache.getPersistentStats() != null) {
-     * _statMap = _cache.getPersistentStats();
-     * }
-     */
-    // TODO: confirm this a good place to init the _statMap when null
-    /*
-     * if (_statMap == null) {
-     * _statMap = new HashMap<String, Map<String, String>>();
-     * }
-     */
-    System.out.println("Refresh stats done: " + (System.currentTimeMillis() - refreshStartTime));
-  }
-
-  public Iterator<String> getAllStats() {
-    return null;
-  }
-
-  /*
-   * TODO: figure out pre-conditions here. I think not allowing anything to be
-   * null on input
-   */
-  public Map<String, String> mergeStats(String statName, Map<String, String> existingStat,
-      Map<String, String> incomingStat) throws HelixException {
-    if (existingStat == null) {
-      throw new HelixException("existing stat for merge is null");
-    }
-    if (incomingStat == null) {
-      throw new HelixException("incoming stat for merge is null");
-    }
-    // get agg type and arguments, then get agg object
-    String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
-    String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
-    Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
-    // XXX: some of below lines might fail with null exceptions
-
-    // get timestamps, values out of zk maps
-    String existingTime = existingStat.get(TIMESTAMP_NAME);
-    String existingVal = existingStat.get(VALUE_NAME);
-    String incomingTime = incomingStat.get(TIMESTAMP_NAME);
-    String incomingVal = incomingStat.get(VALUE_NAME);
-    // parse values into tuples, if the values exist. else, tuples are null
-    Tuple<String> existingTimeTuple =
-        (existingTime != null) ? Tuple.fromString(existingTime) : null;
-    Tuple<String> existingValueTuple = (existingVal != null) ? Tuple.fromString(existingVal) : null;
-    Tuple<String> incomingTimeTuple =
-        (incomingTime != null) ? Tuple.fromString(incomingTime) : null;
-    Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple.fromString(incomingVal) : null;
-
-    // dp merge
-    agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple, incomingTimeTuple, aggArgs);
-    // put merged tuples back in map
-    Map<String, String> mergedMap = new HashMap<String, String>();
-    if (existingTimeTuple.size() == 0) {
-      throw new HelixException("merged time tuple has size zero");
-    }
-    if (existingValueTuple.size() == 0) {
-      throw new HelixException("merged value tuple has size zero");
-    }
-
-    mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
-    mergedMap.put(VALUE_NAME, existingValueTuple.toString());
-    return mergedMap;
-  }
-
-  /*
-   * Find all persisted stats this stat matches. Update those stats. An incoming
-   * stat can match multiple stats exactly (if that stat has multiple agg types)
-   * An incoming stat can match multiple wildcard stats
-   */
-
-  // need to do a time check here!
-
-  public void applyStat(String incomingStatName, Map<String, String> statFields) {
-    // TODO: consider locking stats here
-    // refreshStats(); //will have refreshed by now during stage
-
-    Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
-
-    if (!_statAlertMatchResult.containsKey(incomingStatName)) {
-      _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
-    }
-    Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
-    // traverse through all persistent stats
-    for (String key : _statMap.keySet()) {
-      if (resultMap.containsKey(key)) {
-        MatchResult cachedMatchResult = resultMap.get(key);
-        if (cachedMatchResult == MatchResult.EXACTMATCH) {
-          processExactMatch(key, statFields);
-        } else if (cachedMatchResult == MatchResult.WILDCARDMATCH) {
-          processWildcardMatch(incomingStatName, key, statFields, pendingAdds);
-        }
-        // don't care about NOMATCH
-        continue;
-      }
-      // exact match on stat and stat portion of persisted stat, just update
-      if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName)) {
-        processExactMatch(key, statFields);
-        resultMap.put(key, MatchResult.EXACTMATCH);
-      }
-      // wildcard match
-      else if (ExpressionParser.isIncomingStatWildcardMatch(key, incomingStatName)) {
-        processWildcardMatch(incomingStatName, key, statFields, pendingAdds);
-        resultMap.put(key, MatchResult.WILDCARDMATCH);
-      } else {
-        resultMap.put(key, MatchResult.NOMATCH);
-      }
-    }
-    _statMap.putAll(pendingAdds);
-  }
-
-  void processExactMatch(String key, Map<String, String> statFields) {
-    Map<String, String> mergedStat = mergeStats(key, _statMap.get(key), statFields);
-    // update in place, no problem with hash map
-    _statMap.put(key, mergedStat);
-  }
-
-  void processWildcardMatch(String incomingStatName, String key, Map<String, String> statFields,
-      Map<String, Map<String, String>> pendingAdds) {
-
-    // make sure incoming stat doesn't already exist, either in previous
-    // round or this round
-    // form new key (incomingStatName with agg type from the wildcarded
-    // stat)
-    String statToAdd = ExpressionParser.getWildcardStatSubstitution(key, incomingStatName);
-    // if the stat already existed in _statMap, we have/will apply it as an
-    // exact match
-    // if the stat was added this round to pendingAdds, no need to recreate
-    // (it would have same value)
-    if (!_statMap.containsKey(statToAdd) && !pendingAdds.containsKey(statToAdd)) {
-      // add this stat to persisted stats
-      Map<String, String> mergedStat = mergeStats(statToAdd, getEmptyStat(), statFields);
-      // add to pendingAdds so we don't mess up ongoing traversal of
-      // _statMap
-      pendingAdds.put(statToAdd, mergedStat);
-    }
-  }
-
-  // add parsing of stat (or is that in expression holder?) at least add
-  // validate
-  public void addStat(String exp) throws HelixException {
-    refreshStats(); // get current stats
-
-    String[] parsedStats = ExpressionParser.getBaseStats(exp);
-
-    for (String stat : parsedStats) {
-      if (_statMap.containsKey(stat)) {
-        logger.debug("Stat " + stat + " already exists; not adding");
-        continue;
-      }
-      _statMap.put(stat, getEmptyStat()); // add new stat to map
-    }
-  }
-
-  public static Map<String, Map<String, String>> parseStat(String exp) throws HelixException {
-    String[] parsedStats = ExpressionParser.getBaseStats(exp);
-    Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
-
-    for (String stat : parsedStats) {
-      if (statMap.containsKey(stat)) {
-        logger.debug("Stat " + stat + " already exists; not adding");
-        continue;
-      }
-      statMap.put(stat, getEmptyStat()); // add new stat to map
-    }
-    return statMap;
-  }
-
-  public static Map<String, String> getEmptyStat() {
-    Map<String, String> statFields = new HashMap<String, String>();
-    statFields.put(TIMESTAMP_NAME, "");
-    statFields.put(VALUE_NAME, "");
-    return statFields;
-  }
-
-  public List<Stat> getStatsList() {
-    List<Stat> stats = new LinkedList<Stat>();
-    for (String stat : _statMap.keySet()) {
-      Map<String, String> statFields = _statMap.get(stat);
-      Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
-      Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
-      Stat s = new Stat(stat, valTup, timeTup);
-      stats.add(s);
-    }
-    return stats;
-  }
-
-  public Map<String, Tuple<String>> getStatsMap() {
-    // refreshStats(); //don't refresh, stage will have refreshed by this time
-    HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
-    for (String stat : _statMap.keySet()) {
-      Map<String, String> statFields = _statMap.get(stat);
-      Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
-      Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
-      stats.put(stat, valTup);
-    }
-    return stats;
-  }
-
-  public void updateCache(HealthDataCache cache) {
-    _cache = cache;
-    PersistentStats persistentStatRecord = _cache.getPersistentStats();
-    if (persistentStatRecord != null) {
-      _statMap = persistentStatRecord.getMapFields();
-    } else {
-      _statMap = new HashMap<String, Map<String, String>>();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
deleted file mode 100644
index 2cc733f..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumEachOperator extends Operator {
-
-  public SumEachOperator() {
-    minInputTupleLists = 1;
-    maxInputTupleLists = Integer.MAX_VALUE;
-    inputOutputTupleListsCountsEqual = true;
-    numOutputTupleLists = -1;
-  }
-
-  // for each column, generate sum
-  @Override
-  public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
-    List<Iterator<Tuple<String>>> out = new ArrayList<Iterator<Tuple<String>>>();
-    for (Iterator<Tuple<String>> currIt : input) {
-      Tuple<String> currSum = null;
-      while (currIt.hasNext()) {
-        currSum = sumTuples(currSum, currIt.next());
-      }
-      ArrayList<Tuple<String>> currOutList = new ArrayList<Tuple<String>>();
-      currOutList.add(currSum);
-      out.add(currOutList.iterator());
-    }
-    return out;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
deleted file mode 100644
index 90c9ab0..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumOperator extends Operator {
-
-  public SumOperator() {
-    minInputTupleLists = 1;
-    maxInputTupleLists = Integer.MAX_VALUE;
-    inputOutputTupleListsCountsEqual = false;
-    numOutputTupleLists = 1;
-  }
-
-  public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) {
-    List out = new ArrayList();
-    out.add(input.iterator());
-    return out;
-  }
-
-  @Override
-  public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
-    ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
-    if (input == null || input.size() == 0) {
-      return singleSetToIter(output);
-    }
-    while (true) { // loop through set of iters, return when 1 runs out (not completing the row in
-                   // progress)
-      Tuple<String> rowSum = null;
-      for (Iterator<Tuple<String>> it : input) {
-        if (!it.hasNext()) { // when any iterator runs out, we are done
-          return singleSetToIter(output);
-        }
-        rowSum = sumTuples(rowSum, it.next());
-      }
-      output.add(rowSum);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java b/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
deleted file mode 100644
index e57f088..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class Tuple<T> {
-  List<T> elements;
-
-  public Tuple() {
-    elements = new ArrayList<T>();
-  }
-
-  public int size() {
-    return elements.size();
-  }
-
-  public void add(T entry) {
-    elements.add(entry);
-  }
-
-  public void addAll(Tuple<T> incoming) {
-    elements.addAll(incoming.getElements());
-  }
-
-  public Iterator<T> iterator() {
-    return elements.listIterator();
-  }
-
-  public T getElement(int ind) {
-    return elements.get(ind);
-  }
-
-  public List<T> getElements() {
-    return elements;
-  }
-
-  public void clear() {
-    elements.clear();
-  }
-
-  public static Tuple<String> fromString(String in) {
-    Tuple<String> tup = new Tuple<String>();
-    if (in.length() > 0) {
-      String[] elements = in.split(",");
-      for (String element : elements) {
-        tup.add(element);
-      }
-    }
-    return tup;
-  }
-
-  public String toString() {
-    StringBuilder out = new StringBuilder();
-    Iterator<T> it = iterator();
-    boolean outEmpty = true;
-    while (it.hasNext()) {
-      if (!outEmpty) {
-        out.append(",");
-      }
-      out.append(it.next());
-      outEmpty = false;
-    }
-    return out.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
deleted file mode 100644
index 6ef4cfe..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-
-public class WindowAggregator extends Aggregator {
-
-  int _windowSize;
-
-  public WindowAggregator(String windowSize) {
-    _windowSize = Integer.parseInt(windowSize);
-    _numArgs = 1;
-  }
-
-  public WindowAggregator() {
-    this("1");
-  }
-
-  @Override
-  public void merge(Tuple<String> currValTup, Tuple<String> newValTup, Tuple<String> currTimeTup,
-      Tuple<String> newTimeTup, String... args) {
-
-    _windowSize = Integer.parseInt(args[0]);
-
-    // figure out how many curr tuple values we displace
-    Tuple<String> mergedTimeTuple = new Tuple<String>();
-    Tuple<String> mergedValTuple = new Tuple<String>();
-
-    Iterator<String> currTimeIter = currTimeTup.iterator();
-    Iterator<String> currValIter = currValTup.iterator();
-    Iterator<String> newTimeIter = newTimeTup.iterator();
-    Iterator<String> newValIter = newValTup.iterator();
-    int currCtr = 0;
-    // traverse current vals
-    double currTime = -1;
-    double currVal;
-    while (currTimeIter.hasNext()) {
-      currTime = Double.parseDouble(currTimeIter.next());
-      currVal = Double.parseDouble(currValIter.next());
-      currCtr++;
-      // number of evicted currVals equal to total size of both minus _windowSize
-      if (currCtr > (newTimeTup.size() + currTimeTup.size() - _windowSize)) { // non-evicted
-                                                                              // element, just bump
-                                                                              // down
-        mergedTimeTuple.add(String.valueOf(currTime));
-        mergedValTuple.add(String.valueOf(currVal));
-      }
-    }
-
-    double newVal;
-    double newTime;
-    while (newTimeIter.hasNext()) {
-      newVal = Double.parseDouble(newValIter.next());
-      newTime = Double.parseDouble(newTimeIter.next());
-      if (newTime <= currTime) { // oldest new time older than newest curr time. we will not apply
-                                 // new tuple!
-        return; // curr tuples remain the same
-      }
-      currCtr++;
-      if (currCtr > (newTimeTup.size() + currTimeTup.size() - _windowSize)) { // non-evicted element
-        mergedTimeTuple.add(String.valueOf(newTime));
-        mergedValTuple.add(String.valueOf(newVal));
-      }
-    }
-    // set curr tuples to merged tuples
-    currTimeTup.clear();
-    currTimeTup.addAll(mergedTimeTuple);
-    currValTup.clear();
-    currValTup.addAll(mergedValTuple);
-    // TODO: see if we can do merger in place on curr
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java b/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
deleted file mode 100644
index bf1d9a6..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Classes for Helix alerts
- */
-package org.apache.helix.alerts;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index c85dd0b..92fb636 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -26,15 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Controller;
 import org.apache.helix.api.Participant;
@@ -61,7 +56,6 @@ import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -73,7 +67,6 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.ProvisionerConfigHolder;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
@@ -521,14 +514,6 @@ public class ClusterAccessor {
   }
 
   /**
-   * Get the stats persisted on this cluster
-   * @return PersistentStats, or null if none persisted
-   */
-  public PersistentStats readStats() {
-    return _accessor.getProperty(_keyBuilder.persistantStat());
-  }
-
-  /**
    * Read the persisted controller contexts
    * @return map of context id to controller context
    */
@@ -556,152 +541,6 @@ public class ClusterAccessor {
   }
 
   /**
-   * Get the current cluster stats
-   * @return PersistentStats
-   */
-  public PersistentStats getStats() {
-    return _accessor.getProperty(_keyBuilder.persistantStat());
-  }
-
-  /**
-   * Get the current cluster alerts
-   * @return Alerts
-   */
-  public Alerts getAlerts() {
-    return _accessor.getProperty(_keyBuilder.alerts());
-  }
-
-  /**
-   * Add a statistic specification to the cluster. Existing stat specifications will not be
-   * overwritten
-   * @param statName string representing a stat specification
-   * @return true if the stat spec was added, false otherwise
-   */
-  public boolean addStat(final String statName) {
-    if (!isClusterStructureValid()) {
-      LOG.error("cluster " + _clusterId + " is not setup yet");
-      return false;
-    }
-
-    String persistentStatsPath = _keyBuilder.persistantStat().getPath();
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord statsRec) {
-        if (statsRec == null) {
-          statsRec = new ZNRecord(PersistentStats.nodeName);
-        }
-        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
-        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
-        for (String newStat : newStatMap.keySet()) {
-          if (!currStatMap.containsKey(newStat)) {
-            currStatMap.put(newStat, newStatMap.get(newStat));
-          }
-        }
-        statsRec.setMapFields(currStatMap);
-        return statsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
-   * Remove a statistic specification from the cluster
-   * @param statName string representing a statistic specification
-   * @return true if stats removed, false otherwise
-   */
-  public boolean dropStat(final String statName) {
-    if (!isClusterStructureValid()) {
-      LOG.error("cluster " + _clusterId + " is not setup yet");
-      return false;
-    }
-
-    String persistentStatsPath = _keyBuilder.persistantStat().getPath();
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord statsRec) {
-        if (statsRec == null) {
-          throw new HelixException("No stats record in ZK, nothing to drop");
-        }
-        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
-        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
-        // delete each stat from stat map
-        for (String newStat : newStatMap.keySet()) {
-          if (currStatMap.containsKey(newStat)) {
-            currStatMap.remove(newStat);
-          }
-        }
-        statsRec.setMapFields(currStatMap);
-        return statsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
-   * Add an alert specification to the cluster
-   * @param alertName string representing the alert spec
-   * @return true if added, false otherwise
-   */
-  public boolean addAlert(final String alertName) {
-    if (!isClusterStructureValid()) {
-      LOG.error("cluster " + _clusterId + " is not setup yet");
-      return false;
-    }
-
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    String alertsPath = _keyBuilder.alerts().getPath();
-    return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord alertsRec) {
-        if (alertsRec == null) {
-          alertsRec = new ZNRecord(Alerts.nodeName);
-        }
-        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
-        StringBuilder newStatName = new StringBuilder();
-        Map<String, String> newAlertMap = new HashMap<String, String>();
-
-        // use AlertsHolder to get map of new stats and map for this alert
-        AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
-
-        // add stat
-        addStat(newStatName.toString());
-
-        // add alert
-        currAlertMap.put(alertName, newAlertMap);
-        alertsRec.setMapFields(currAlertMap);
-        return alertsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
-   * Remove an alert specification from the cluster
-   * @param alertName string representing an alert specification
-   * @return true if removed, false otherwise
-   */
-  public boolean dropAlert(final String alertName) {
-    if (!isClusterStructureValid()) {
-      LOG.error("cluster " + _clusterId + " is not setup yet");
-      return false;
-    }
-
-    String alertsPath = _keyBuilder.alerts().getPath();
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord alertsRec) {
-        if (alertsRec == null) {
-          throw new HelixException("No alerts record persisted, nothing to drop");
-        }
-        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
-        currAlertMap.remove(alertName);
-        alertsRec.setMapFields(currAlertMap);
-        return alertsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
    * Add user configuration to the existing cluster user configuration. Overwrites properties with
    * the same key
    * @param userConfig the user config key-value pairs to add

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index f9af914..7bb214e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -33,7 +33,6 @@ import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.IdealStateChangeListener;
@@ -66,7 +65,6 @@ import org.apache.helix.controller.stages.ResourceValidationStage;
 import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HealthStat;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -76,23 +74,21 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 /**
- * Cluster Controllers main goal is to keep the cluster state as close as possible to
- * Ideal State. It does this by listening to changes in cluster state and scheduling new
- * tasks to get cluster state to best possible ideal state. Every instance of this class
- * can control can control only one cluster
- * Get all the partitions use IdealState, CurrentState and Messages <br>
+ * Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State.
+ * It does this by listening to changes in cluster state and scheduling new tasks to get cluster
+ * state to best possible ideal state. Every instance of this class can control can control only one
+ * cluster Get all the partitions use IdealState, CurrentState and Messages <br>
  * foreach partition <br>
  * 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
- * 2. compute best possible state (instance,state) pair. This needs previous step data and
- * state model constraints <br>
+ * 2. compute best possible state (instance,state) pair. This needs previous step data and state
+ * model constraints <br>
  * 3. compute the messages/tasks needed to move to 1 to 2 <br>
  * 4. select the messages that can be sent, needs messages and state model constraints <br>
  * 5. send messages
  */
 public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener,
     LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
-    ExternalViewChangeListener, ControllerChangeListener, HealthStateChangeListener,
-    InstanceConfigChangeListener {
+    ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener {
   private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
   volatile boolean init = false;
   private final PipelineRegistry _registry;
@@ -109,15 +105,14 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   private final ClusterEventProcessor _eventThread;
 
   /**
-   * The _paused flag is checked by function handleEvent(), while if the flag is set
-   * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
-   * is set.
+   * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
+   * will be no-op. Other event handling logic keeps the same when the flag is set.
    */
   private boolean _paused;
 
   /**
-   * The timer that can periodically run the rebalancing pipeline. The timer will start if there
-   * is one resource group has the config to use the timer.
+   * The timer that can periodically run the rebalancing pipeline. The timer will start if there is
+   * one resource group has the config to use the timer.
    */
   Timer _rebalanceTimer = null;
   int _timerPeriod = Integer.MAX_VALUE;
@@ -128,9 +123,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   private ClusterDataCache _cache;
 
   /**
-   * Default constructor that creates a default pipeline registry. This is sufficient in
-   * most cases, but if there is a some thing specific needed use another constructor
-   * where in you can pass a pipeline registry
+   * Default constructor that creates a default pipeline registry. This is sufficient in most cases,
+   * but if there is a some thing specific needed use another constructor where in you can pass a
+   * pipeline registry
    */
   public GenericHelixController() {
     this(createDefaultRegistry());
@@ -160,9 +155,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
 
   // TODO who should stop this timer
   /**
-   * Starts the rebalancing timer with the specified period. Start the timer if necessary;
-   * If the period is smaller than the current period, cancel the current timer and use
-   * the new period.
+   * Starts the rebalancing timer with the specified period. Start the timer if necessary; If the
+   * period is smaller than the current period, cancel the current timer and use the new period.
    */
   void startRebalancingTimer(int period, HelixManager manager) {
     logger.info("Controller starting timer at period " + period);
@@ -227,7 +221,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
       registry
           .register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
-
       return registry;
     }
   }
@@ -245,8 +238,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   }
 
   /**
-   * lock-always: caller always needs to obtain an external lock before call, calls to
-   * handleEvent() should be serialized
+   * lock-always: caller always needs to obtain an external lock before call, calls to handleEvent()
+   * should be serialized
    * @param event
    */
   protected synchronized void handleEvent(ClusterEvent event) {
@@ -345,16 +338,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   }
 
   @Override
-  public void onHealthChange(String instanceName, List<HealthStat> reports,
-      NotificationContext changeContext) {
-    /**
-     * When there are more participant ( > 20, can be in hundreds), This callback can be
-     * called quite frequently as each participant reports health stat every minute. Thus
-     * we change the health check pipeline to run in a timer callback.
-     */
-  }
-
-  @Override
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onMessage()");
@@ -526,9 +509,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   }
 
   /**
-   * Go through the list of liveinstances in the cluster, and add currentstateChange
-   * listener and Message listeners to them if they are newly added. For current state
-   * change, the observation is tied to the session id of each live instance.
+   * Go through the list of liveinstances in the cluster, and add currentstateChange listener and
+   * Message listeners to them if they are newly added. For current state change, the observation is
+   * tied to the session id of each live instance.
    */
   protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
deleted file mode 100644
index 6b29e2d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.model.AlertStatus;
-import org.apache.helix.model.Alerts;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.PersistentStats;
-
-public class HealthDataCache {
-  Map<String, LiveInstance> _liveInstanceMap;
-
-  Map<String, Map<String, HealthStat>> _healthStatMap;
-  HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
-  PersistentStats _persistentStats;
-  Alerts _alerts;
-  AlertStatus _alertStatus;
-
-  public HealthStat getGlobalStats() {
-    return _globalStats;
-  }
-
-  public PersistentStats getPersistentStats() {
-    return _persistentStats;
-  }
-
-  public Alerts getAlerts() {
-    return _alerts;
-  }
-
-  public AlertStatus getAlertStatus() {
-    return _alertStatus;
-  }
-
-  public Map<String, HealthStat> getHealthStats(String instanceName) {
-    Map<String, HealthStat> map = _healthStatMap.get(instanceName);
-    if (map != null) {
-      return map;
-    } else {
-      return Collections.emptyMap();
-    }
-  }
-
-  public Map<String, LiveInstance> getLiveInstances() {
-    return _liveInstanceMap;
-  }
-
-  public boolean refresh(HelixDataAccessor accessor) {
-    Builder keyBuilder = accessor.keyBuilder();
-    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-
-    Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
-
-    for (String instanceName : _liveInstanceMap.keySet()) {
-      // xxx clearly getting znodes for the instance here...so get the
-      // timestamp!
-
-      Map<String, HealthStat> childValuesMap =
-          accessor.getChildValuesMap(keyBuilder.healthReports(instanceName));
-      hsMap.put(instanceName, childValuesMap);
-    }
-    _healthStatMap = Collections.unmodifiableMap(hsMap);
-    _persistentStats = accessor.getProperty(keyBuilder.persistantStat());
-    _alerts = accessor.getProperty(keyBuilder.alerts());
-    _alertStatus = accessor.getProperty(keyBuilder.alertStatus());
-
-    return true;
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/38b43965/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
deleted file mode 100644
index 859c1d0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-
-public class ReadHealthDataStage extends AbstractBaseStage {
-  HealthDataCache _cache;
-
-  public ReadHealthDataStage() {
-    _cache = new HealthDataCache();
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null) {
-      throw new StageException("HelixManager attribute value is null");
-    }
-    // DataAccessor dataAccessor = manager.getDataAccessor();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    _cache.refresh(accessor);
-
-    event.addAttribute("HealthDataCache", _cache);
-
-    long processLatency = System.currentTimeMillis() - startTime;
-    addLatencyToMonitor(event, processLatency);
-  }
-}


Mime
View raw message