helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [10/42] Refactoring the package names and removing jsql parser
Date Wed, 24 Oct 2012 23:14:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java b/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
new file mode 100644
index 0000000..e0d048e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
@@ -0,0 +1,123 @@
+package org.apache.helix.examples;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+public class DummyParticipant
+{
+  // dummy master-slave state model
+  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
+  public static class DummyMSStateModel extends StateModel
+  {
+    @Transition(to = "SLAVE", from = "OFFLINE")
+    public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
+    {
+      String partitionName = message.getPartitionName();
+      String instanceName = message.getTgtName();
+      System.out.println(instanceName + " becomes SLAVE from OFFLINE for " + partitionName);
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context)
+    {
+      String partitionName = message.getPartitionName();
+      String instanceName = message.getTgtName();
+      System.out.println(instanceName + " becomes MASTER from SLAVE for " + partitionName);
+    }
+
+    @Transition(to = "SLAVE", from = "MASTER")
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
+    {
+      String partitionName = message.getPartitionName();
+      String instanceName = message.getTgtName();
+      System.out.println(instanceName + " becomes SLAVE from MASTER for " + partitionName);
+    }
+
+    @Transition(to = "OFFLINE", from = "SLAVE")
+    public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
+    {
+      String partitionName = message.getPartitionName();
+      String instanceName = message.getTgtName();
+      System.out.println(instanceName + " becomes OFFLINE from SLAVE for " + partitionName);
+    }
+
+    @Transition(to = "DROPPED", from = "OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+    {
+      String partitionName = message.getPartitionName();
+      String instanceName = message.getTgtName();
+      System.out.println(instanceName + " becomes DROPPED from OFFLINE for " + partitionName);
+    }
+
+    @Transition(to = "OFFLINE", from = "ERROR")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context)
+    {
+      String partitionName = message.getPartitionName();
+      String instanceName = message.getTgtName();
+      System.out.println(instanceName + " becomes OFFLINE from ERROR for " + partitionName);
+    }
+
+    @Override
+    public void reset()
+    {
+      System.out.println("Default MockMSStateModel.reset() invoked");
+    }
+  }
+
+  // dummy master slave state model factory
+  public static class DummyMSModelFactory extends StateModelFactory<DummyMSStateModel>
+  {
+    @Override
+    public DummyMSStateModel createNewStateModel(String partitionName)
+    {
+      DummyMSStateModel model = new DummyMSStateModel();
+      return model;
+    }
+  }
+
+  public static void main(String[] args)
+  {
+    if (args.length < 3)
+    {
+      System.err.println("USAGE: DummyParticipant zkAddress clusterName instanceName");
+      System.exit(1);
+    }
+
+    String zkAddr = args[0];
+    String clusterName = args[1];
+    String instanceName = args[2];
+
+    HelixManager manager = null;
+    try
+    {
+      manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+          InstanceType.PARTICIPANT, zkAddr);
+
+      StateMachineEngine stateMach = manager.getStateMachineEngine();
+      DummyMSModelFactory msModelFactory = new DummyMSModelFactory();
+      stateMach.registerStateModelFactory("MasterSlave", msModelFactory);
+
+      manager.connect();
+
+      Thread.currentThread().join();
+    } catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } finally
+    {
+      if (manager != null)
+      {
+        manager.disconnect();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/ExampleHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleHelper.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleHelper.java
new file mode 100644
index 0000000..f27fdad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleHelper.java
@@ -0,0 +1,56 @@
+package org.apache.helix.examples;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+
+public class ExampleHelper
+{
+  
+  
+  public static ZkServer startZkServer(String zkAddr)
+  {
+    System.out.println("Start zookeeper at " + zkAddr + " in thread "
+        + Thread.currentThread().getName());
+
+    String zkDir = zkAddr.replace(':', '_');
+    final String logDir = "/tmp/" + zkDir + "/logs";
+    final String dataDir = "/tmp/" + zkDir + "/dataDir";
+    try
+    {
+      FileUtils.deleteDirectory(new File(dataDir));
+      FileUtils.deleteDirectory(new File(logDir));
+    } catch (IOException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+      @Override
+      public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient zkClient)
+      {
+        // do nothing
+      }
+    };
+
+    int port = Integer.parseInt(zkAddr.substring(zkAddr.lastIndexOf(':') + 1));
+    ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+    zkServer.start();
+
+    return zkServer;
+  }
+  
+  public static void stopZkServer(ZkServer zkServer)
+  {
+    if (zkServer != null)
+    {
+      zkServer.shutdown();
+      System.out.println("Shut down zookeeper at port " + zkServer.getPort()
+          + " in thread " + Thread.currentThread().getName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
new file mode 100644
index 0000000..44eb75b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.tools.ClusterStateVerifier;
+
+
+public class ExampleProcess
+{
+
+  public static final String zkServer = "zkSvr";
+  public static final String cluster = "cluster";
+  public static final String hostAddress = "host";
+  public static final String hostPort = "port";
+  public static final String relayCluster = "relayCluster";
+  public static final String help = "help";
+  public static final String configFile = "configFile";
+  public static final String stateModel = "stateModelType";
+  public static final String transDelay = "transDelay";
+
+  private final String zkConnectString;
+  private final String clusterName;
+  private final String instanceName;
+  private final String stateModelType;
+  private HelixManager manager;
+
+//  private StateMachineEngine genericStateMachineHandler;
+
+  private String _file = null;
+  private StateModelFactory<StateModel> stateModelFactory;
+  private final int delay;
+
+  public ExampleProcess(String zkConnectString, String clusterName,
+      String instanceName, String file, String stateModel, int delay)
+  {
+    this.zkConnectString = zkConnectString;
+    this.clusterName = clusterName;
+    this.instanceName = instanceName;
+    this._file = file;
+    stateModelType = stateModel;
+    this.delay = delay;
+  }
+
+  public void start() throws Exception
+  {
+    if (_file == null)
+    {
+      manager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                          instanceName,
+                                                          InstanceType.PARTICIPANT,
+                                                          zkConnectString);
+
+    }
+    else
+    {
+      manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
+                                                                  instanceName,
+                                                                  InstanceType.PARTICIPANT,
+                                                                  _file);
+
+    }
+
+    if ("MasterSlave".equalsIgnoreCase(stateModelType))
+    {
+      stateModelFactory = new MasterSlaveStateModelFactory(delay);
+    } else if ("OnlineOffline".equalsIgnoreCase(stateModelType))
+    {
+      stateModelFactory = new OnlineOfflineStateModelFactory(delay);
+    } else if ("LeaderStandby".equalsIgnoreCase(stateModelType))
+    {
+      stateModelFactory = new LeaderStandbyStateModelFactory(delay);
+    }
+//    genericStateMachineHandler = new StateMachineEngine();
+//    genericStateMachineHandler.registerStateModelFactory(stateModelType, stateModelFactory);
+    
+    StateMachineEngine stateMach = manager.getStateMachineEngine();
+    stateMach.registerStateModelFactory(stateModelType, stateModelFactory);
+    manager.connect();
+    manager.getMessagingService().registerMessageHandlerFactory(
+        MessageType.STATE_TRANSITION.toString(), stateMach);
+    if (_file != null)
+    {
+      ClusterStateVerifier.verifyFileBasedClusterStates(_file, instanceName,
+          stateModelFactory);
+
+    }
+  }
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions()
+  {
+    Option helpOption = OptionBuilder.withLongOpt(help)
+        .withDescription("Prints command-line options info").create();
+
+    Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+        .withDescription("Provide zookeeper address").create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+    Option clusterOption = OptionBuilder.withLongOpt(cluster)
+        .withDescription("Provide cluster name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+        .withDescription("Provide host name").create();
+    hostOption.setArgs(1);
+    hostOption.setRequired(true);
+    hostOption.setArgName("Host name (Required)");
+
+    Option portOption = OptionBuilder.withLongOpt(hostPort)
+        .withDescription("Provide host port").create();
+    portOption.setArgs(1);
+    portOption.setRequired(true);
+    portOption.setArgName("Host port (Required)");
+
+    Option stateModelOption = OptionBuilder.withLongOpt(stateModel)
+        .withDescription("StateModel Type").create();
+    stateModelOption.setArgs(1);
+    stateModelOption.setRequired(true);
+    stateModelOption.setArgName("StateModel Type (Required)");
+
+    // add an option group including either --zkSvr or --configFile
+    Option fileOption = OptionBuilder.withLongOpt(configFile)
+        .withDescription("Provide file to read states/messages").create();
+    fileOption.setArgs(1);
+    fileOption.setRequired(true);
+    fileOption.setArgName("File to read states/messages (Optional)");
+
+    Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
+        .withDescription("Provide state trans delay").create();
+    transDelayOption.setArgs(1);
+    transDelayOption.setRequired(false);
+    transDelayOption.setArgName("Delay time in state transition, in MS");
+
+    OptionGroup optionGroup = new OptionGroup();
+    optionGroup.addOption(zkServerOption);
+    optionGroup.addOption(fileOption);
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    // options.addOption(zkServerOption);
+    options.addOption(clusterOption);
+    options.addOption(hostOption);
+    options.addOption(portOption);
+    options.addOption(stateModelOption);
+    options.addOption(transDelayOption);
+
+    options.addOptionGroup(optionGroup);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + ExampleProcess.class.getName(), cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs)
+      throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    try
+    {
+      return cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe)
+    {
+      System.err
+          .println("CommandLineClient: failed to parse command-line options: "
+              + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    String zkConnectString = "localhost:2181";
+    String clusterName = "storage-integration-cluster";
+    String instanceName = "localhost_8905";
+    String file = null;
+    String stateModelValue = "MasterSlave";
+    int delay = 0;
+    boolean skipZeroArgs = true;// false is for dev testing
+    if (!skipZeroArgs || args.length > 0)
+    {
+      CommandLine cmd = processCommandLineArgs(args);
+      zkConnectString = cmd.getOptionValue(zkServer);
+      clusterName = cmd.getOptionValue(cluster);
+
+      String host = cmd.getOptionValue(hostAddress);
+      String portString = cmd.getOptionValue(hostPort);
+      int port = Integer.parseInt(portString);
+      instanceName = host + "_" + port;
+
+      file = cmd.getOptionValue(configFile);
+      if (file != null)
+      {
+        File f = new File(file);
+        if (!f.exists())
+        {
+          System.err.println("static config file doesn't exist");
+          System.exit(1);
+        }
+      }
+
+      stateModelValue = cmd.getOptionValue(stateModel);
+      if (cmd.hasOption(transDelay))
+      {
+        try
+        {
+          delay = Integer.parseInt(cmd.getOptionValue(transDelay));
+          if (delay < 0)
+          {
+            throw new Exception("delay must be positive");
+          }
+        } catch (Exception e)
+        {
+          e.printStackTrace();
+          delay = 0;
+        }
+      }
+    }
+    // Espresso_driver.py will consume this
+    System.out.println("Starting Process with ZK:" + zkConnectString);
+
+    ExampleProcess process = new ExampleProcess(zkConnectString, clusterName,
+        instanceName, file, stateModelValue, delay);
+
+    process.start();
+    Thread.currentThread().join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
new file mode 100644
index 0000000..f090ace
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
@@ -0,0 +1,164 @@
+package org.apache.helix.examples;
+
+import java.io.File;
+
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+
+/**
+ * Ideal state json format file used in this example for CUSTOMIZED ideal state mode
+ * <p>
+ * <pre>
+ * {
+ * "id" : "TestDB",
+ * "mapFields" : {
+ *   "TestDB_0" : {
+ *     "localhost_12918" : "MASTER",
+ *     "localhost_12919" : "SLAVE",
+ *     "localhost_12920" : "SLAVE"
+ *   },
+ *   "TestDB_1" : {
+ *     "localhost_12918" : "MASTER",
+ *     "localhost_12919" : "SLAVE",
+ *     "localhost_12920" : "SLAVE"
+ *   },
+ *   "TestDB_2" : {
+ *     "localhost_12918" : "MASTER",
+ *     "localhost_12919" : "SLAVE",
+ *     "localhost_12920" : "SLAVE"
+ *   },
+ *   "TestDB_3" : {
+ *     "localhost_12918" : "MASTER",
+ *     "localhost_12919" : "SLAVE",
+ *     "localhost_12920" : "SLAVE"
+ *   }
+ * },
+ * "listFields" : {
+ * },
+ * "simpleFields" : {
+ *   "IDEAL_STATE_MODE" : "CUSTOMIZED",
+ *   "NUM_PARTITIONS" : "4",
+ *   "REPLICAS" : "3",
+ *   "STATE_MODEL_DEF_REF" : "MasterSlave",
+ *   "STATE_MODEL_FACTORY_NAME" : "DEFAULT"
+ * }
+ * }
+ * </pre>
+ * 
+ */
+
+public class IdealStateExample
+{
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length < 3)
+    {
+      System.err.println("USAGE: IdealStateExample zkAddress clusterName idealStateMode (AUTO, AUTO_REBALANCE, or CUSTOMIZED) idealStateJsonFile (required for CUSTOMIZED mode)");
+      System.exit(1);
+    }
+
+    final String zkAddr = args[0];
+    final String clusterName = args[1];
+    final String idealStateModeStr = args[2].toUpperCase();
+    String idealStateJsonFile = null;
+    IdealStateModeProperty idealStateMode =
+        IdealStateModeProperty.valueOf(idealStateModeStr);
+    if (idealStateMode == IdealStateModeProperty.CUSTOMIZED)
+    {
+      if (args.length < 4)
+      {
+        System.err.println("Missng idealStateJsonFile for CUSTOMIZED ideal state mode");
+        System.exit(1);
+      }
+      idealStateJsonFile = args[3];
+    }
+
+    // add cluster {clusterName}
+    ZkClient zkclient =
+        new ZkClient(zkAddr,
+                     ZkClient.DEFAULT_SESSION_TIMEOUT,
+                     ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+                     new ZNRecordSerializer());
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+    admin.addCluster(clusterName, true);
+
+    // add MasterSlave state mode definition
+    StateModelConfigGenerator generator = new StateModelConfigGenerator();
+    admin.addStateModelDef(clusterName,
+                           "MasterSlave",
+                           new StateModelDefinition(generator.generateConfigForMasterSlave()));
+
+    // add 3 participants: "localhost:{12918, 12919, 12920}"
+    for (int i = 0; i < 3; i++)
+    {
+      int port = 12918 + i;
+      InstanceConfig config = new InstanceConfig("localhost_" + port);
+      config.setHostName("localhost");
+      config.setPort(Integer.toString(port));
+      config.setInstanceEnabled(true);
+      admin.addInstance(clusterName, config);
+    }
+
+    // add resource "TestDB" which has 4 partitions and uses MasterSlave state model
+    String resourceName = "TestDB";
+    if (idealStateMode == IdealStateModeProperty.AUTO
+        || idealStateMode == IdealStateModeProperty.AUTO_REBALANCE)
+    {
+      admin.addResource(clusterName, resourceName, 4, "MasterSlave", idealStateModeStr);
+
+      // rebalance resource "TestDB" using 3 replicas
+      admin.rebalance(clusterName, resourceName, 3);
+    }
+    else if (idealStateMode == IdealStateModeProperty.CUSTOMIZED)
+    {
+      admin.addIdealState(clusterName, resourceName, idealStateJsonFile);
+    }
+
+    // start helix controller
+    new Thread(new Runnable()
+    {
+
+      @Override
+      public void run()
+      {
+        try
+        {
+          HelixControllerMain.main(new String[] { "--zkSvr", zkAddr, "--cluster",
+              clusterName });
+        }
+        catch (Exception e)
+        {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+
+    }).start();
+
+    // start 3 dummy participants
+    for (int i = 0; i < 3; i++)
+    {
+      int port = 12918 + i;
+      final String instanceName = "localhost_" + port;
+      new Thread(new Runnable()
+      {
+
+        @Override
+        public void run()
+        {
+          DummyParticipant.main(new String[] { zkAddr, clusterName, instanceName });
+        }
+      }).start();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
new file mode 100644
index 0000000..ec54970
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class LeaderStandbyStateModelFactory extends
+		StateModelFactory<StateModel> {
+	int _delay;
+
+	public LeaderStandbyStateModelFactory(int delay) {
+		_delay = delay;
+	}
+
+	@Override
+	public StateModel createNewStateModel(String stateUnitKey) {
+		LeaderStandbyStateModel stateModel = new LeaderStandbyStateModel();
+		stateModel.setDelay(_delay);
+		return stateModel;
+	}
+
+	public static class LeaderStandbyStateModel extends StateModel {
+		int _transDelay = 0;
+
+		public void setDelay(int delay) {
+			_transDelay = delay > 0 ? delay : 0;
+		}
+
+		public void onBecomeLeaderFromStandby(Message message,
+				NotificationContext context) {
+			System.out
+					.println("LeaderStandbyStateModel.onBecomeLeaderFromStandby()");
+			sleep();
+		}
+
+		public void onBecomeStandbyFromLeader(Message message,
+				NotificationContext context) {
+			System.out
+					.println("LeaderStandbyStateModel.onBecomeStandbyFromLeader()");
+			sleep();
+		}
+
+		private void sleep() {
+			try {
+				Thread.sleep(_transDelay);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
new file mode 100644
index 0000000..b8be32a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+@SuppressWarnings("rawtypes")
+public class MasterSlaveStateModelFactory extends StateModelFactory<StateModel> {
+	int _delay;
+
+	public MasterSlaveStateModelFactory(int delay) {
+		_delay = delay;
+	}
+
+	@Override
+	public StateModel createNewStateModel(String stateUnitKey) {
+		MasterSlaveStateModel stateModel = new MasterSlaveStateModel();
+		stateModel.setDelay(_delay);
+		stateModel.setStateUnitKey(stateUnitKey);
+		return stateModel;
+	}
+
+	public static class MasterSlaveStateModel extends StateModel {
+		int _transDelay = 0;
+		String stateUnitKey;
+		
+		public String getStateUnitKey() {
+			return stateUnitKey;
+		}
+
+		public void setStateUnitKey(String stateUnitKey) {
+			this.stateUnitKey = stateUnitKey;
+		}
+
+		public void setDelay(int delay) {
+			_transDelay = delay > 0 ? delay : 0;
+		}
+
+		public void onBecomeSlaveFromOffline(Message message,
+				NotificationContext context) {
+
+			System.out.println("MasterSlaveStateModel.onBecomeSlaveFromOffline() for "+ stateUnitKey);
+			sleep();
+		}
+
+		private void sleep() {
+			try {
+				Thread.sleep(_transDelay);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		public void onBecomeSlaveFromMaster(Message message,
+				NotificationContext context) {
+			System.out.println("MasterSlaveStateModel.onBecomeSlaveFromMaster() for "+ stateUnitKey);
+			sleep();
+
+		}
+
+		public void onBecomeMasterFromSlave(Message message,
+				NotificationContext context) {
+			System.out.println("MasterSlaveStateModel.onBecomeMasterFromSlave() for "+ stateUnitKey);
+			sleep();
+
+		}
+
+		public void onBecomeOfflineFromSlave(Message message,
+				NotificationContext context) {
+			System.out.println("MasterSlaveStateModel.onBecomeOfflineFromSlave() for "+ stateUnitKey);
+			sleep();
+
+		}
+		
+		public void onBecomeDroppedFromOffline(Message message,
+        NotificationContext context) {
+      System.out.println("ObBecomeDroppedFromOffline() for "+ stateUnitKey);
+      sleep();
+
+    }
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000..7b9bebe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class OnlineOfflineStateModelFactory extends
+		StateModelFactory<StateModel> {
+	int _delay;
+
+	public OnlineOfflineStateModelFactory(int delay) {
+		_delay = delay;
+	}
+
+	@Override
+	public StateModel createNewStateModel(String stateUnitKey) {
+		OnlineOfflineStateModel stateModel = new OnlineOfflineStateModel();
+		stateModel.setDelay(_delay);
+		return stateModel;
+	}
+
+	public static class OnlineOfflineStateModel extends StateModel {
+		int _transDelay = 0;
+
+		public void setDelay(int delay) {
+			_transDelay = delay > 0 ? delay : 0;
+		}
+
+		public void onBecomeOnlineFromOffline(Message message,
+				NotificationContext context) {
+			System.out
+					.println("OnlineOfflineStateModel.onBecomeOnlineFromOffline()");
+			sleep();
+		}
+
+		public void onBecomeOfflineFromOnline(Message message,
+				NotificationContext context) {
+			System.out
+					.println("OnlineOfflineStateModel.onBecomeOfflineFromOnline()");
+			sleep();
+		}
+		
+		public void onBecomeDroppedFromOffline(Message message,
+        NotificationContext context) {
+      System.out.println("OnlineOfflineStateModel.onBecomeDroppedFromOffline()");
+      sleep();
+
+    }
+
+		private void sleep() {
+			try {
+				Thread.sleep(_transDelay);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/package-info.java b/helix-core/src/main/java/org/apache/helix/examples/package-info.java
new file mode 100644
index 0000000..0016cca
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Examples of using Helix cluster manager
+ * 
+ */
+package org.apache.helix.examples;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
new file mode 100644
index 0000000..3816914
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import org.apache.log4j.Logger;
+
+public class AccumulateAggregationType implements AggregationType
+{
+
+  private static final Logger logger = Logger
+      .getLogger(AccumulateAggregationType.class);
+
+  public final static String TYPE_NAME = "accumulate";
+
+  @Override
+  public String getName()
+  {
+    return TYPE_NAME;
+  }
+
+  @Override
+  public String merge(String iv, String ev, long prevTimestamp)
+  {
+    double inVal = Double.parseDouble(iv);
+    double existingVal = Double.parseDouble(ev);
+    return String.valueOf(inVal + existingVal);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
new file mode 100644
index 0000000..d331a0f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+public interface AggregationType
+{
+
+  // public abstract <T extends Object> T merge(T iv, T ev);
+
+  public final static String DELIM = "#";
+
+  public abstract String merge(String incomingVal, String existingVal,
+      long prevTimestamp);
+
+  public abstract String getName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
new file mode 100644
index 0000000..3e43f25
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+public class AggregationTypeFactory
+{
+  private static final Logger logger = Logger
+      .getLogger(AggregationTypeFactory.class);
+
+  public AggregationTypeFactory()
+  {
+  }
+
+  // TODO: modify this function so that it takes a single string, but can parse
+  // apart params from type
+  public static AggregationType getAggregationType(String input)
+  {
+    if (input == null)
+    {
+      logger.error("AggregationType name is null");
+      return null;
+    }
+    StringTokenizer tok = new StringTokenizer(input, AggregationType.DELIM);
+    String type = tok.nextToken();
+    int numParams = tok.countTokens();
+    String[] params = new String[numParams];
+    for (int i = 0; i < numParams; i++)
+    {
+      if (!tok.hasMoreTokens())
+      {
+        logger.error("Trying to parse non-existent params");
+        return null;
+      }
+      params[i] = tok.nextToken();
+    }
+
+    if (type.equals(AccumulateAggregationType.TYPE_NAME))
+    {
+      return new AccumulateAggregationType();
+    }
+    else if (type.equals(DecayAggregationType.TYPE_NAME))
+    {
+      if (params.length < 1)
+      {
+        logger
+            .error("DecayAggregationType must contain <decay weight> parameter");
+        return null;
+      }
+      return new DecayAggregationType(Double.parseDouble(params[0]));
+    }
+    else if (type.equals(WindowAggregationType.TYPE_NAME))
+    {
+      if (params.length < 1)
+      {
+        logger
+            .error("WindowAggregationType must contain <window size> parameter");
+      }
+      return new WindowAggregationType(Integer.parseInt(params[0]));
+    }
+    else
+    {
+      logger.error("Unknown AggregationType " + type);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
new file mode 100644
index 0000000..6cec7f8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+public class DecayAggregationType implements AggregationType
+{
+
+  private static final Logger logger = Logger
+      .getLogger(DecayAggregationType.class);
+
+  public final static String TYPE_NAME = "decay";
+
+  double _decayFactor = 0.1;
+
+  public DecayAggregationType(double df)
+  {
+    super();
+    _decayFactor = df;
+  }
+
+  @Override
+  public String getName()
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append(TYPE_NAME);
+    sb.append(DELIM);
+    sb.append(_decayFactor);
+    return sb.toString();
+  }
+
+  @Override
+  public String merge(String iv, String ev, long prevTimestamp)
+  {
+    double incomingVal = Double.parseDouble(iv);
+    double existingVal = Double.parseDouble(ev);
+    long currTimestamp = System.currentTimeMillis();
+    double minutesOld = (currTimestamp - prevTimestamp) / 60000.0;
+    // come up with decay coeff for old value. More time passed, the more it
+    // decays
+    double oldDecayCoeff = Math.pow((1 - _decayFactor), minutesOld);
+    return String
+        .valueOf((double) (oldDecayCoeff * existingVal + (1 - oldDecayCoeff)
+            * incomingVal));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
new file mode 100644
index 0000000..ae262eb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+class DefaultHealthReportProvider extends HealthReportProvider
+{
+  private static final Logger _logger = Logger
+      .getLogger(DefaultHealthReportProvider.class);
+
+  public final static String _availableCPUs = "availableCPUs";
+  public final static String _freePhysicalMemory = "freePhysicalMemory";
+  public final static String _totalJvmMemory = "totalJvmMemory";
+  public final static String _freeJvmMemory = "freeJvmMemory";
+  public final static String _averageSystemLoad = "averageSystemLoad";
+
+  public DefaultHealthReportProvider()
+  {
+  }
+
+  @Override
+  public Map<String, String> getRecentHealthReport()
+  {
+    OperatingSystemMXBean osMxBean = ManagementFactory
+        .getOperatingSystemMXBean();
+    long freeJvmMemory = Runtime.getRuntime().freeMemory();
+    long totalJvmMemory = Runtime.getRuntime().totalMemory();
+    int availableCPUs = osMxBean.getAvailableProcessors();
+    double avgSystemLoad = osMxBean.getSystemLoadAverage();
+    long freePhysicalMemory = Long.MAX_VALUE;
+
+    try
+    {
+      // if( osMxBean instanceof com.sun.management.OperatingSystemMXBean)
+      // {
+      // com.sun.management.OperatingSystemMXBean sunOsMxBean
+      // = (com.sun.management.OperatingSystemMXBean) osMxBean;
+      // freePhysicalMemory = sunOsMxBean.getFreePhysicalMemorySize();
+      // }
+    }
+    catch (Throwable t)
+    {
+      _logger.error(t);
+    }
+
+    Map<String, String> result = new TreeMap<String, String>();
+
+    result.put(_availableCPUs, "" + availableCPUs);
+    result.put(_freePhysicalMemory, "" + freePhysicalMemory);
+    result.put(_freeJvmMemory, "" + freeJvmMemory);
+    result.put(_totalJvmMemory, "" + totalJvmMemory);
+    result.put(_averageSystemLoad, "" + avgSystemLoad);
+
+    return result;
+  }
+
+  @Override
+  public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+  {
+    Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
+    
+    result.put(getReportName(), getRecentHealthReport());
+    return result;
+  }
+
+  @Override
+  public void resetStats()
+  {
+    // TODO Auto-generated method stub
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
new file mode 100644
index 0000000..cbc9938
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class DefaultPerfCounters extends ZNRecord
+{
+  private static final Logger _logger = Logger
+      .getLogger(DefaultPerfCounters.class);
+
+  public final static String _availableCPUs = "availableCPUs";
+  public final static String _freePhysicalMemory = "freePhysicalMemory";
+  public final static String _totalJvmMemory = "totalJvmMemory";
+  public final static String _freeJvmMemory = "freeJvmMemory";
+  public final static String _averageSystemLoad = "averageSystemLoad";
+
+  public DefaultPerfCounters(String instanceName, long availableCPUs,
+      long freePhysicalMemory, long freeJvmMemory, long totalJvmMemory,
+      double averageSystemLoad)
+  {
+    super("DefaultPerfCounters");
+    setSimpleField("instanceName", instanceName);
+    setSimpleField("createTime", new Date().toString());
+
+    setSimpleField(_availableCPUs, "" + availableCPUs);
+    setSimpleField(_freePhysicalMemory, "" + freePhysicalMemory);
+    setSimpleField(_freeJvmMemory, "" + freeJvmMemory);
+    setSimpleField(_totalJvmMemory, "" + totalJvmMemory);
+    setSimpleField(_averageSystemLoad, "" + averageSystemLoad);
+  }
+
+  public long getAvailableCpus()
+  {
+    return getSimpleLongVal(_availableCPUs);
+  }
+
+  public double getAverageSystemLoad()
+  {
+    return getSimpleDoubleVal(_averageSystemLoad);
+  }
+
+  public long getTotalJvmMemory()
+  {
+    return getSimpleLongVal(_totalJvmMemory);
+  }
+
+  public long getFreeJvmMemory()
+  {
+    return getSimpleLongVal(_freeJvmMemory);
+  }
+
+  public long getFreePhysicalMemory()
+  {
+    return getSimpleLongVal(_freePhysicalMemory);
+  }
+
+  long getSimpleLongVal(String key)
+  {
+    String strVal = getSimpleField(key);
+    if (strVal == null)
+    {
+      return 0;
+    }
+    try
+    {
+      return Long.parseLong(strVal);
+    }
+    catch (Exception e)
+    {
+      _logger.warn(e);
+      return 0;
+    }
+  }
+
+  double getSimpleDoubleVal(String key)
+  {
+    String strVal = getSimpleField(key);
+    if (strVal == null)
+    {
+      return 0;
+    }
+    try
+    {
+      return Double.parseDouble(strVal);
+    }
+    catch (Exception e)
+    {
+      _logger.warn(e);
+      return 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
new file mode 100644
index 0000000..1725208
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Map;
+
+public abstract class HealthReportProvider
+{
+  public static final String _defaultPerfCounters = "defaultPerfCounters";
+
+  public abstract Map<String, String> getRecentHealthReport();
+
+  public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+  {
+    return null;
+  }
+
+  public abstract void resetStats();
+
+  public String getReportName()
+  {
+    return _defaultPerfCounters;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
new file mode 100644
index 0000000..136ae63
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
@@ -0,0 +1,203 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ReadHealthDataStage;
+import org.apache.helix.controller.stages.StatsAggregationStage;
+import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
+import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
+import org.apache.log4j.Logger;
+
+
+public class HealthStatsAggregationTask extends HelixTimerTask
+{
+  private static final Logger LOG = Logger.getLogger(HealthStatsAggregationTask.class);
+
+  public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
+
+  private Timer _timer;
+  private final HelixManager _manager;
+  private final Pipeline _healthStatsAggregationPipeline;
+  private final int _delay;
+  private final int _period;
+  private final ClusterAlertMBeanCollection _alertItemCollection;
+  private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
+      new HashMap<String, HelixStageLatencyMonitor>();
+
+  public HealthStatsAggregationTask(HelixManager manager, int delay, int period)
+  {
+    _manager = manager;
+    _delay = delay;
+    _period = period;
+
+    // health stats pipeline
+    _healthStatsAggregationPipeline = new Pipeline();
+    _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
+    StatsAggregationStage statAggregationStage = new StatsAggregationStage();
+    _healthStatsAggregationPipeline.addStage(statAggregationStage);
+    _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
+
+    registerStageLatencyMonitor(_healthStatsAggregationPipeline);
+  }
+
+  public HealthStatsAggregationTask(HelixManager manager)
+  {
+    this(manager, DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
+  }
+
+  private void registerStageLatencyMonitor(Pipeline pipeline)
+  {
+    for (Stage stage : pipeline.getStages())
+    {
+      String stgName = stage.getStageName();
+      if (!_stageLatencyMonitorMap.containsKey(stgName))
+      {
+        try
+        {
+          _stageLatencyMonitorMap.put(stage.getStageName(),
+                                      new HelixStageLatencyMonitor(_manager.getClusterName(),
+                                                                   stgName));
+        }
+        catch (Exception e)
+        {
+          LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
+        }
+      }
+      else
+      {
+        LOG.error("StageLatencyMonitor for stage: " + stgName
+            + " already exists. Skip register it");
+      }
+    }
+  }
+
+  @Override
+  public void start()
+  {
+    LOG.info("START HealthAggregationTask");
+
+    if (_timer == null)
+    {
+      // Remove all the previous health check values, if any
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
+      for(String healthReportName : existingHealthRecordNames)
+      {
+        LOG.info("Removing old healthrecord " + healthReportName);
+        accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
+      }
+      
+      _timer = new Timer(true);
+      _timer.scheduleAtFixedRate(this, new Random().nextInt(_delay), _period);
+    }
+    else
+    {
+      LOG.warn("timer already started");
+    }
+  }
+
+  @Override
+  public synchronized void stop()
+  {
+    LOG.info("Stop HealthAggregationTask");
+
+    if (_timer != null)
+    {
+      _timer.cancel();
+      _timer = null;
+      _alertItemCollection.reset();
+
+      for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
+      {
+        stgLatencyMonitor.reset();
+      }
+    }
+    else
+    {
+      LOG.warn("timer already stopped");
+    }
+  }
+
+  @Override
+  public synchronized void run()
+  {
+    if (!isEnabled())
+    {
+      LOG.info("HealthAggregationTask is disabled.");
+      return;
+    }
+    
+    if (!_manager.isLeader())
+    {
+      LOG.error("Cluster manager: " + _manager.getInstanceName()
+          + " is not leader. Pipeline will not be invoked");
+      return;
+    }
+
+    try
+    {
+      ClusterEvent event = new ClusterEvent("healthChange");
+      event.addAttribute("helixmanager", _manager);
+      event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
+
+      _healthStatsAggregationPipeline.handle(event);
+      _healthStatsAggregationPipeline.finish();
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
+                e);
+    }
+  }
+
+  private boolean isEnabled()
+  {
+    ConfigAccessor configAccessor = _manager.getConfigAccessor();
+    boolean enabled = true;
+    if (configAccessor != null)
+    {
+      // zk-based cluster manager
+      ConfigScope scope =
+          new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
+      String isEnabled = configAccessor.get(scope, "healthChange.enabled");
+      if (isEnabled != null)
+      {
+        enabled = new Boolean(isEnabled);
+      }
+    }
+    else
+    {
+      LOG.debug("File-based cluster manager doesn't support disable healthChange");
+    }
+    return enabled;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
new file mode 100644
index 0000000..55e05fc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import org.apache.helix.ZNRecord;
+
+public interface ParticipantHealthReportCollector
+{
+  public abstract void addHealthReportProvider(HealthReportProvider provider);
+
+  public abstract void removeHealthReportProvider(HealthReportProvider provider);
+
+  public abstract void reportHealthReportMessage(ZNRecord healthReport);
+
+  public abstract void transmitHealthReports();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
new file mode 100644
index 0000000..3174ce5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
@@ -0,0 +1,186 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.model.HealthStat;
+import org.apache.log4j.Logger;
+
+
+public class ParticipantHealthReportCollectorImpl implements
+    ParticipantHealthReportCollector
+{
+  private final LinkedList<HealthReportProvider> _healthReportProviderList = new LinkedList<HealthReportProvider>();
+  private Timer _timer;
+  private static final Logger _logger = Logger
+      .getLogger(ParticipantHealthReportCollectorImpl.class);
+  private final HelixManager _helixManager;
+  String _instanceName;
+  public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
+
+  public ParticipantHealthReportCollectorImpl(HelixManager helixManager,
+      String instanceName)
+  {
+    _helixManager = helixManager;
+    _instanceName = instanceName;
+    addDefaultHealthCheckInfoProvider();
+  }
+
+  private void addDefaultHealthCheckInfoProvider()
+  {
+    addHealthReportProvider(new DefaultHealthReportProvider());
+  }
+
+  public void start()
+  {
+    if (_timer == null)
+    {
+      _timer = new Timer(true);
+      _timer.scheduleAtFixedRate(new HealthCheckInfoReportingTask(),
+          new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
+    }
+    else
+    {
+      _logger.warn("timer already started");
+    }
+  }
+
+  @Override
+  public void addHealthReportProvider(HealthReportProvider provider)
+  {
+    try
+    {
+      synchronized (_healthReportProviderList)
+      {
+        if (!_healthReportProviderList.contains(provider))
+        {
+          _healthReportProviderList.add(provider);
+        }
+        else
+        {
+          _logger.warn("Skipping a duplicated HealthCheckInfoProvider");
+        }
+      }
+    }
+    catch (Exception e)
+    {
+      _logger.error(e);
+    }
+  }
+
+  @Override
+  public void removeHealthReportProvider(HealthReportProvider provider)
+  {
+    synchronized (_healthReportProviderList)
+    {
+      if (_healthReportProviderList.contains(provider))
+      {
+        _healthReportProviderList.remove(provider);
+      }
+      else
+      {
+        _logger.warn("Skip removing a non-exist HealthCheckInfoProvider");
+      }
+    }
+  }
+
+  @Override
+  public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate)
+  {
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+//    accessor.setProperty(
+//        PropertyType.HEALTHREPORT, healthCheckInfoUpdate, _instanceName,
+//        healthCheckInfoUpdate.getId());
+    accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()), 
+                         new HealthStat(healthCheckInfoUpdate));
+
+  }
+
+  public void stop()
+  {
+    _logger.info("Stop HealthCheckInfoReportingTask");
+    if (_timer != null)
+    {
+      _timer.cancel();
+      _timer = null;
+    }
+    else
+    {
+      _logger.warn("timer already stopped");
+    }
+  }
+
+  @Override
+  public synchronized void transmitHealthReports()
+  {
+    synchronized (_healthReportProviderList)
+    {
+      for (HealthReportProvider provider : _healthReportProviderList)
+      {
+        try
+        {
+          Map<String, String> report = provider.getRecentHealthReport();
+          Map<String, Map<String, String>> partitionReport = provider
+              .getRecentPartitionHealthReport();
+          ZNRecord record = new ZNRecord(provider.getReportName());
+          if (report != null)
+          {
+            record.setSimpleFields(report);
+          }
+          if (partitionReport != null)
+          {
+            record.setMapFields(partitionReport);
+          }
+          record.setSimpleField(StatsHolder.TIMESTAMP_NAME, "" + System.currentTimeMillis());
+          
+          HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+          Builder keyBuilder = accessor.keyBuilder();
+          accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()), 
+                               new HealthStat(record));
+
+//          _helixManager.getDataAccessor().setProperty(
+//              PropertyType.HEALTHREPORT, record, _instanceName, record.getId());
+          // reset stats (for now just the partition stats)
+          provider.resetStats();
+        }
+        catch (Exception e)
+        {
+          _logger.error("", e);
+        }
+      }
+    }
+  }
+
+  class HealthCheckInfoReportingTask extends TimerTask
+  {
+    @Override
+    public void run()
+    {
+      transmitHealthReports();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
new file mode 100644
index 0000000..64229a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
@@ -0,0 +1,161 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+
+public class PerformanceHealthReportProvider extends HealthReportProvider
+{
+
+  private static final Logger _logger = Logger
+      .getLogger(PerformanceHealthReportProvider.class);
+
+  public final static String _testStat = "testStat";
+  public final static String _readLatencyStat = "readLatencyStat";
+  public final static String _requestCountStat = "requestCountStat";
+  public final static String _partitionRequestCountStat = "partitionRequestCountStat";
+
+  public static final String _performanceCounters = "performanceCounters";
+
+  public int readLatencyCount = 0;
+  public double readLatencySum = 0;
+
+  public int requestCount = 0;
+
+  // private final Map<String, String> _partitionCountsMap = new HashMap<String,
+  // String>();
+
+  private final Map<String, HashMap<String, String>> _partitionStatMaps = new HashMap<String, HashMap<String, String>>();
+
+  public PerformanceHealthReportProvider()
+  {
+  }
+
+  @Override
+  public Map<String, String> getRecentHealthReport()
+  {
+    long testStat = 10;
+
+    Map<String, String> result = new TreeMap<String, String>();
+
+    result.put(_testStat, "" + testStat);
+    result.put(_readLatencyStat, "" + readLatencySum
+        / (double) readLatencyCount);
+    result.put(_requestCountStat, "" + requestCount);
+
+    return result;
+  }
+
+  @Override
+  public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+  {
+    Map<String, Map<String, String>> result = new TreeMap<String, Map<String, String>>();
+    for (String statName : _partitionStatMaps.keySet())
+    {
+      result.put(statName, _partitionStatMaps.get(statName));
+    }
+    return result;
+  }
+
+  HashMap<String, String> getStatMap(String statName, boolean createIfMissing)
+  {
+    // check if map for this stat exists. if not, create it
+    HashMap<String, String> statMap;
+    if (!_partitionStatMaps.containsKey(statName))
+    {
+      if (!createIfMissing)
+      {
+        return null;
+      }
+      statMap = new HashMap<String, String>();
+      _partitionStatMaps.put(statName, statMap);
+    }
+    else
+    {
+      statMap = _partitionStatMaps.get(statName);
+    }
+    return statMap;
+  }
+
+  // TODO:
+  // Currently participant is source of truth and updates ZK. We want ZK to be
+  // source of truth.
+  // Revise this approach the participant sends deltas of stats to controller
+  // (ZK?) and have controller do aggregation
+  // and update ZK. Make sure to wipe the participant between uploads.
+  String getPartitionStat(HashMap<String, String> partitionMap,
+      String partitionName)
+  {
+    return partitionMap.get(partitionName);
+  }
+
+  void setPartitionStat(HashMap<String, String> partitionMap,
+      String partitionName, String value)
+  {
+    partitionMap.put(partitionName, value);
+  }
+
+  public void incrementPartitionStat(String statName, String partitionName)
+  {
+    HashMap<String, String> statMap = getStatMap(statName, true);
+    String currValStr = getPartitionStat(statMap, partitionName);
+    double currVal;
+    if (currValStr == null)
+    {
+      currVal = 1.0;
+    }
+    else
+    {
+      currVal = Double.parseDouble(getPartitionStat(statMap, partitionName));
+      currVal++;
+    }
+    setPartitionStat(statMap, partitionName, String.valueOf(currVal));
+  }
+
+  public void submitPartitionStat(String statName, String partitionName,
+      String value)
+  {
+    HashMap<String, String> statMap = getStatMap(statName, true);
+    setPartitionStat(statMap, partitionName, value);
+  }
+
+  public String getPartitionStat(String statName, String partitionName)
+  {
+    HashMap<String, String> statMap = getStatMap(statName, false);
+    if (statMap == null)
+    {
+      return null;
+    }
+    else
+    {
+      return statMap.get(partitionName);
+    }
+  }
+
+  public void resetStats()
+  {
+    _partitionStatMaps.clear();
+  }
+
+  public String getReportName()
+  {
+    return _performanceCounters;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java b/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
new file mode 100644
index 0000000..9629fe9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
@@ -0,0 +1,146 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class Stat
+{
+
+  private static final Logger _logger = Logger.getLogger(Stat.class);
+
+  public final static String OP_TYPE = "HTTP_OP";
+  public final static String MEASUREMENT_TYPE = "MEASUREMENT";
+  public final static String RESOURCE_NAME = "RESOURCE_NAME";
+  public final static String PARTITION_NAME = "PARTITION_NAME";
+  public final static String NODE_NAME = "NODE_NAME";
+  public final static String TIMESTAMP = "TIMESTAMP";
+  public final static String RETURN_STATUS = "RETURN_STATUS";
+  public final static String METRIC_NAME = "METRIC_NAME";
+  public final static String AGG_TYPE = "AGG_TYPE";
+
+  public String _opType;
+  public String _measurementType;
+  public String _resourceName;
+  public String _partitionName;
+  public String _nodeName;
+  public String _returnStatus;
+  public String _metricName;
+  public String _aggTypeName;
+  public String _timestamp;
+
+  public Stat(String opType, String measurementType, String resourceName,
+      String partitionName, String nodeName)
+  {
+    // this(opType, measurementType, resourceName, partitionName, nodeName,
+    // null, null, null);
+    this(opType, measurementType, resourceName, partitionName, nodeName, null,
+        null, null);
+  }
+
+  public Stat(String opType, String measurementType, String resourceName,
+      String partitionName, String nodeName, String returnStatus,
+      String metricName, AggregationType aggType)
+  {
+    this._opType = opType;
+    this._measurementType = measurementType;
+    this._resourceName = resourceName;
+    this._partitionName = partitionName;
+    this._nodeName = nodeName;
+    this._returnStatus = returnStatus;
+    this._metricName = metricName;
+    this._aggTypeName = null;
+    if (aggType != null)
+    {
+      this._aggTypeName = aggType.getName();
+    }
+
+    _timestamp = String.valueOf(System.currentTimeMillis());
+  }
+
+  public Stat(Map<String, String> in)
+  {
+    _opType = in.get(OP_TYPE);
+    _measurementType = in.get(MEASUREMENT_TYPE);
+    _resourceName = in.get(RESOURCE_NAME);
+    _partitionName = in.get(PARTITION_NAME);
+    _nodeName = in.get(NODE_NAME);
+    _timestamp = String.valueOf(System.currentTimeMillis());
+  }
+
+  public void setAggType(AggregationType aggType)
+  {
+    this._aggTypeName = aggType.getName();
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (!(obj instanceof Stat))
+    {
+      return false;
+    }
+    Stat other = (Stat) obj;
+    if (!_partitionName.equals(other._partitionName))
+    {
+      return false;
+    }
+    if (!_opType.equals(other._opType))
+    {
+      return false;
+    }
+    if (!_measurementType.equals(other._measurementType))
+    {
+      return false;
+    }
+    if (!_resourceName.equals(other._resourceName))
+    {
+      return false;
+    }
+    if (!_nodeName.equals(other._nodeName))
+    {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return (_partitionName + _opType + _measurementType + _resourceName + _nodeName)
+        .hashCode();
+  }
+
+  public void addAlert(long value)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  public String toString()
+  {
+    return _nodeName + "." + _resourceName + "." + _partitionName + "."
+        + _opType + "." + _measurementType + "." + _returnStatus + "."
+        + _metricName + "." + _aggTypeName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
new file mode 100644
index 0000000..e4ee622
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
@@ -0,0 +1,175 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+public class StatHealthReportProvider extends HealthReportProvider
+{
+
+  private static final Logger _logger = Logger
+      .getLogger(StatHealthReportProvider.class);
+
+  /*
+   * public final static String _testStat = "testStat"; public final static
+   * String _readLatencyStat = "readLatencyStat"; public final static String
+   * _requestCountStat = "requestCountStat"; public final static String
+   * _partitionRequestCountStat = "partitionRequestCountStat";
+   */
+
+  public static final String REPORT_NAME = "ParticipantStats";
+  public String _reportName = REPORT_NAME;
+
+  public static final String STAT_VALUE = "value";
+  public static final String TIMESTAMP = "timestamp";
+
+  public int readLatencyCount = 0;
+  public double readLatencySum = 0;
+
+  public int requestCount = 0;
+
+  // private final Map<String, String> _partitionCountsMap = new HashMap<String,
+  // String>();
+
+  // private final Map<String, HashMap<String,String>> _partitionStatMaps = new
+  // HashMap<String, HashMap<String,String>>();
+  private final ConcurrentHashMap<String, String> _statsToValues = new ConcurrentHashMap<String, String>();
+  private final ConcurrentHashMap<String, String> _statsToTimestamps = new ConcurrentHashMap<String, String>();
+
+  public StatHealthReportProvider()
+  {
+  }
+
+  @Override
+  public Map<String, String> getRecentHealthReport()
+  {
+    return null;
+  }
+
+  // TODO: function is misnamed, but return type is what I want
+  @Override
+  public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+  {
+    Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
+    for (String stat : _statsToValues.keySet())
+    {
+      Map<String, String> currStat = new HashMap<String, String>();
+      /*
+       * currStat.put(Stat.OP_TYPE, stat._opType);
+       * currStat.put(Stat.MEASUREMENT_TYPE, stat._measurementType);
+       * currStat.put(Stat.NODE_NAME, stat._nodeName);
+       * currStat.put(Stat.PARTITION_NAME, stat._partitionName);
+       * currStat.put(Stat.RESOURCE_NAME, stat._resourceName);
+       * currStat.put(Stat.RETURN_STATUS, stat._returnStatus);
+       * currStat.put(Stat.METRIC_NAME, stat._metricName);
+       * currStat.put(Stat.AGG_TYPE, stat._aggTypeName);
+       */
+      currStat.put(TIMESTAMP, _statsToTimestamps.get(stat));
+      currStat.put(STAT_VALUE, _statsToValues.get(stat));
+      result.put(stat, currStat);
+    }
+    return result;
+  }
+
+  public boolean contains(Stat inStat)
+  {
+    return _statsToValues.containsKey(inStat);
+  }
+
+  public Set<String> keySet()
+  {
+    return _statsToValues.keySet();
+  }
+
+  public String getStatValue(Stat inStat)
+  {
+    return _statsToValues.get(inStat);
+  }
+
+  public long getStatTimestamp(Stat inStat)
+  {
+    return Long.parseLong(_statsToTimestamps.get(inStat));
+  }
+
+  /*
+   * public String getStatValue(String opType, String measurementType, String
+   * resourceName, String partitionName, String nodeName, boolean
+   * createIfMissing) { Stat rs = new Stat(opType, measurementType,
+   * resourceName, partitionName, nodeName); String val =
+   * _statsToValues.get(rs); if (val == null && createIfMissing) { val = "0";
+   * _statsToValues.put(rs, val); } return val; }
+   */
+
+  public void writeStat(String statName, String val, String timestamp)
+  {
+    _statsToValues.put(statName, val);
+    _statsToTimestamps.put(statName, timestamp);
+  }
+
+  /*
+   * public void setStat(Stat es, String val, String timestamp) { writeStat(es,
+   * val, timestamp); }
+   * 
+   * public void setStat(String opType, String measurementType, String
+   * resourceName, String partitionName, String nodeName, double val, String
+   * timestamp) { Stat rs = new Stat(opType, measurementType, resourceName,
+   * partitionName, nodeName); writeStat(rs, String.valueOf(val), timestamp); }
+   */
+
+  public void incrementStat(String statName, String timestamp)
+  {
+    // Stat rs = new Stat(opType, measurementType, resourceName, partitionName,
+    // nodeName);
+    String val = _statsToValues.get(statName);
+    if (val == null)
+    {
+      val = "0";
+    }
+    else
+    {
+      val = String.valueOf(Double.parseDouble(val) + 1);
+    }
+    writeStat(statName, val, timestamp);
+  }
+
+  public int size()
+  {
+    return _statsToValues.size();
+  }
+
+  public void resetStats()
+  {
+    _statsToValues.clear();
+    _statsToTimestamps.clear();
+  }
+
+  public void setReportName(String name)
+  {
+    _reportName = name;
+  }
+
+  public String getReportName()
+  {
+    return _reportName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
new file mode 100644
index 0000000..3db4382
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+public class WindowAggregationType implements AggregationType
+{
+
+  private static final Logger logger = Logger
+      .getLogger(WindowAggregationType.class);
+
+  public final String WINDOW_DELIM = "#";
+
+  public final static String TYPE_NAME = "window";
+
+  int _windowSize = 1;
+
+  public WindowAggregationType(int ws)
+  {
+    super();
+    _windowSize = ws;
+  }
+
+  @Override
+  public String getName()
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append(TYPE_NAME);
+    sb.append(DELIM);
+    sb.append(_windowSize);
+    return sb.toString();
+  }
+
+  @Override
+  public String merge(String incomingVal, String existingVal, long prevTimestamp)
+  {
+    String[] windowVals;
+    if (existingVal == null)
+    {
+      return incomingVal;
+    }
+    else
+    {
+      windowVals = existingVal.split(WINDOW_DELIM);
+      int currLength = windowVals.length;
+      // window not full
+      if (currLength < _windowSize)
+      {
+        return existingVal + WINDOW_DELIM + incomingVal;
+      }
+      // evict oldest
+      else
+      {
+        int firstDelim = existingVal.indexOf(WINDOW_DELIM);
+        return existingVal.substring(firstDelim + 1) + WINDOW_DELIM
+            + incomingVal;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java b/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
new file mode 100644
index 0000000..3aa4e53
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix health check classes
+ * 
+ */
+package org.apache.helix.healthcheck;
\ No newline at end of file


Mime
View raw message