helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [07/15] Adding Helix-task-framework and Yarn integration modules
Date Fri, 20 Sep 2013 18:30:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/RedisTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/RedisTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/RedisTargetProvider.java
new file mode 100644
index 0000000..94c617d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/RedisTargetProvider.java
@@ -0,0 +1,329 @@
+package org.apache.helix.metamanager.cluster;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.metamanager.ClusterStatusProvider;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+public class RedisTargetProvider implements ClusterStatusProvider {
+
+    static final Logger        log               = Logger.getLogger(RedisTargetProvider.class);
+
+    public static final String BENCHMARK_COMMAND = "redis-benchmark";
+    public static final String BENCHMARK_TESTS   = "GET,SET";
+
+    public static final String DEFAULT_RECORDS   = "100000";
+    public static final String DEFAULT_CLIENTS   = "20";
+    public static final String DEFAULT_REQUESTS  = "100000";
+    public static final String DEFAULT_TIMEOUT   = "8000";
+    public static final String DEFAULT_INTERVAL  = "10000";
+
+    ZkClient                   zookeeper;
+
+    final String               address;
+    final String               root;
+
+    final int                  records;
+    final int                  clients;
+    final int                  requests;
+    final int                  timeout;
+    final int                  interval;
+
+    int                        targetTpsGet;
+    int                        targetTpsSet;
+    int                        targetCount       = 1;
+
+    ScheduledExecutorService   executor;
+
+    public RedisTargetProvider(Properties properties) {
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        targetTpsGet = Integer.valueOf(properties.getProperty("tps.get", "0"));
+        targetTpsSet = Integer.valueOf(properties.getProperty("tps.set", "0"));
+        records = Integer.valueOf(properties.getProperty("records", DEFAULT_RECORDS));
+        clients = Integer.valueOf(properties.getProperty("clients", DEFAULT_CLIENTS));
+        requests = Integer.valueOf(properties.getProperty("requests", DEFAULT_REQUESTS));
+        timeout = Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT));
+        interval = Integer.valueOf(properties.getProperty("interval", DEFAULT_INTERVAL));
+    }
+
+    public void startService() {
+        log.debug("starting redis status service");
+        zookeeper = new ZkClient(address);
+        zookeeper.createPersistent("/" + root, true);
+
+        // TODO not concurrency-safe, should not matter though
+        if (!zookeeper.exists("/" + root + "/target.get")) {
+            try {
+                zookeeper.createPersistent("/" + root + "/target.get", String.valueOf(targetTpsGet));
+            } catch (Exception ignore) {
+            }
+        }
+        if (!zookeeper.exists("/" + root + "/target.set")) {
+            try {
+                zookeeper.createPersistent("/" + root + "/target.set", String.valueOf(targetTpsSet));
+            } catch (Exception ignore) {
+            }
+        }
+
+        executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(new RedisBenchmarkRunnable(), 0, interval, TimeUnit.MILLISECONDS);
+    }
+
+    public void stopService() {
+        log.debug("stopping redis status service");
+        if (executor != null) {
+            executor.shutdownNow();
+            while (!executor.isTerminated()) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            executor = null;
+        }
+        if (zookeeper != null) {
+            zookeeper.close();
+            zookeeper = null;
+        }
+    }
+
+    @Override
+    public int getTargetContainerCount(String containerType) throws Exception {
+        return targetCount;
+    }
+
+    private class RedisBenchmarkRunnable implements Runnable {
+        ExecutorService executor = Executors.newCachedThreadPool();
+        RedisResult     aggregateResult;
+
+        @Override
+        public void run() {
+            log.debug("running redis benchmark");
+
+            aggregateResult = new RedisResult(0);
+            Collection<Future<RedisResult>> futures = new ArrayList<Future<RedisResult>>();
+
+            try {
+                Collection<RedisTarget> targets = getTargets();
+
+                // start benchmark
+                for (RedisTarget target : targets) {
+                    log.debug(String.format("submitting target '%s'", target));
+                    Future<RedisResult> future = executor.submit(new RedisCallable(target));
+                    futures.add(future);
+                }
+
+                // aggregate results
+                try {
+                    log.debug("waiting for results");
+
+                    long limit = System.currentTimeMillis() + timeout;
+                    for (Future<RedisResult> future : futures) {
+                        try {
+                            RedisResult result = future.get(limit - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+                            log.debug(String.format("got result '%s'", result));
+                            aggregate(result);
+                        } catch (Exception e) {
+                            log.warn(String.format("failed to get result"));
+                            future.cancel(true);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error running redis benchmark", e);
+
+                    for (Future<RedisResult> future : futures) {
+                        future.cancel(true);
+                    }
+
+                    return;
+                }
+
+                // compare to thresholds
+                log.debug(String.format("aggregate result is '%s'", aggregateResult));
+
+                // get target from zookeeper
+                try {
+                    targetTpsGet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.get"));
+                } catch (Exception ignore) {
+                }
+                try {
+                    targetTpsSet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.set"));
+                } catch (Exception ignore) {
+                }
+
+                // calculate counts
+                int targetCountGet = -1;
+                if (aggregateResult.containsKey("GET")) {
+                    double tpsTarget = targetTpsGet;
+                    double tps = aggregateResult.get("GET");
+
+                    targetCountGet = (int) Math.ceil(tpsTarget / tps * aggregateResult.serverCount);
+                    log.debug(String.format("count.get=%d, tps.get=%f, target.get=%f", targetCountGet, tps, tpsTarget));
+                }
+
+                int targetCountSet = -1;
+                if (aggregateResult.containsKey("SET")) {
+                    double tpsTarget = targetTpsSet;
+                    double tps = aggregateResult.get("SET");
+
+                    targetCountSet = (int) Math.ceil(tpsTarget / tps * aggregateResult.serverCount);
+                    log.debug(String.format("count.set=%d, tps.set=%f, target.set=%f", targetCountSet, tps, tpsTarget));
+                }
+
+                targetCount = Math.max(targetCountGet, targetCountSet);
+                targetCount = Math.max(targetCount, 1);
+
+                log.debug(String.format("target count is %d", targetCount));
+                RedisTargetProvider.this.targetCount = targetCount;
+
+            } catch (Exception e) {
+                log.error("Error running redis benchmark", e);
+
+                for (Future<RedisResult> future : futures) {
+                    future.cancel(true);
+                }
+            }
+
+        }
+
+        Collection<RedisTarget> getTargets() {
+            log.debug("fetching redis servers from zookeeper");
+            Collection<RedisTarget> targets = new ArrayList<RedisTarget>();
+            Collection<String> servers = zookeeper.getChildren("/" + root);
+
+            servers.remove("target.get");
+            servers.remove("target.set");
+
+            for (String server : servers) {
+                String hostname = zookeeper.readData("/" + root + "/" + server + "/hostname");
+                int port = Integer.valueOf(zookeeper.<String> readData("/" + root + "/" + server + "/port"));
+
+                targets.add(new RedisTarget(hostname, port));
+            }
+
+            log.debug(String.format("found %d servers: %s", targets.size(), targets));
+            return targets;
+        }
+
+        void aggregate(RedisResult result) {
+            RedisResult newResult = new RedisResult(aggregateResult.serverCount + result.serverCount);
+
+            for (Entry<String, Double> entry : result.entrySet()) {
+                double current = 0.0d;
+                if (aggregateResult.containsKey(entry.getKey()))
+                    current = aggregateResult.get(entry.getKey());
+
+                current += entry.getValue();
+                newResult.put(entry.getKey(), current);
+            }
+
+            aggregateResult = newResult;
+        }
+    }
+
+    private static class RedisTarget {
+        final String hostname;
+        final int    port;
+
+        public RedisTarget(String hostname, int port) {
+            this.hostname = hostname;
+            this.port = port;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s:%d", hostname, port);
+        }
+    }
+
+    private static class RedisResult extends HashMap<String, Double> {
+        /**
+         * 
+         */
+        private static final long serialVersionUID = 4599748807597500952L;
+
+        final int                 serverCount;
+
+        public RedisResult(int serverCount) {
+            this.serverCount = serverCount;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("[serverCount=%d %s]", serverCount, super.toString());
+        }
+    }
+
+    private class RedisCallable implements Callable<RedisResult> {
+        final RedisTarget target;
+
+        public RedisCallable(RedisTarget target) {
+            this.target = target;
+        }
+
+        @Override
+        public RedisResult call() throws Exception {
+            log.debug(String.format("executing benchmark for '%s'", target));
+
+            ProcessBuilder builder = new ProcessBuilder();
+            builder.command(BENCHMARK_COMMAND, "-h", target.hostname, "-p", String.valueOf(target.port), "-r", String.valueOf(records), "-n",
+                    String.valueOf(requests), "-c", String.valueOf(clients), "-t", BENCHMARK_TESTS, "--csv");
+            Process process = builder.start();
+
+            log.debug(String.format("running '%s'", builder.command()));
+
+            RedisResult result = new RedisResult(1);
+
+            int retVal;
+            try {
+                retVal = process.waitFor();
+            } catch (InterruptedException e) {
+                process.destroy();
+                return result;
+            }
+
+            Preconditions.checkState(retVal == 0, "Benchmark process returned %s", retVal);
+
+            Pattern pattern = Pattern.compile("\"([A-Z0-9_]+).*\",\"([0-9\\.]+)\"");
+
+            log.debug("parsing output");
+            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            String line = null;
+            while ((line = reader.readLine()) != null) {
+                Matcher matcher = pattern.matcher(line);
+
+                if (!matcher.find())
+                    continue;
+
+                String key = matcher.group(1);
+                Double value = Double.valueOf(matcher.group(2));
+
+                result.put(key, value);
+            }
+
+            log.debug(String.format("benchmark for '%s' returned '%s'", target, result));
+
+            return result;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/StaticTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/StaticTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/StaticTargetProvider.java
new file mode 100644
index 0000000..47bf725
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/StaticTargetProvider.java
@@ -0,0 +1,41 @@
+package org.apache.helix.metamanager.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.helix.metamanager.ClusterStatusProvider;
+
+
+public class StaticTargetProvider implements ClusterStatusProvider {
+
+	final Map<String, Integer> targetCounts = new HashMap<String, Integer>();
+	
+	public StaticTargetProvider() {
+	    // left blank
+	}
+	
+	public StaticTargetProvider(Properties properties) {
+	    for(Entry<Object, Object> entry : properties.entrySet()) {
+	        String key = (String)entry.getKey();
+	        int value = Integer.valueOf((String)entry.getValue());
+	        
+	        targetCounts.put(key, value);
+	    }
+	}
+	
+	public StaticTargetProvider(Map<String, Integer> targetCounts) {
+		this.targetCounts.putAll(targetCounts);
+	}
+	
+	@Override
+	public int getTargetContainerCount(String containerType) {
+		return targetCounts.get(containerType);
+	}
+
+	public void setTargetContainerCount(String containerType, int targetCount) {
+		targetCounts.put(containerType, targetCount);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcess.java
new file mode 100644
index 0000000..11ad86e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcess.java
@@ -0,0 +1,133 @@
+package org.apache.helix.metamanager.container;
+
+import java.util.Properties;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.metamanager.Service;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base service for spawn-able container types. Configure from Properties and
+ * instantiates Helix participant to managed cluster.
+ * 
+ */
+public abstract class ContainerProcess implements Service {
+    static final Logger                             log    = Logger.getLogger(ContainerProcess.class);
+
+    private ContainerProcessProperties              properties;
+    private HelixManager                            participantManager;
+
+    private String                                  modelName;
+    private StateModelFactory<? extends StateModel> modelFactory;
+
+    private String                                  instanceName;
+    private String                                  clusterName;
+    private String                                  zookeeperAddress;
+
+    private boolean                                 active = false;
+    private boolean                                 failed = false;
+
+    public final void setModelName(String modelName) {
+        this.modelName = modelName;
+    }
+
+    public final void setModelFactory(StateModelFactory<? extends StateModel> modelFactory) {
+        this.modelFactory = modelFactory;
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        ContainerProcessProperties containerProps = new ContainerProcessProperties();
+        containerProps.putAll(properties);
+        Preconditions.checkArgument(containerProps.isValid());
+
+        this.properties = containerProps;
+        this.instanceName = containerProps.getName();
+        this.clusterName = containerProps.getCluster();
+        this.zookeeperAddress = containerProps.getAddress();
+    }
+
+    @Override
+    public final void start() {
+        try {
+            Preconditions.checkNotNull(modelName, "state model name not set");
+            Preconditions.checkNotNull(modelFactory, "state model factory not set");
+            Preconditions.checkState(properties.isValid(), "process properties not valid: %s", properties.toString());
+
+            log.info(String.format("starting container '%s'", instanceName));
+            startContainer();
+
+            log.info(String.format("starting helix participant '%s'", instanceName));
+            startParticipant();
+
+            active = true;
+
+        } catch (Exception e) {
+            log.error(String.format("starting container '%s' failed", instanceName), e);
+            fail();
+        }
+    }
+
+    protected abstract void startContainer() throws Exception;
+
+    private final void startParticipant() throws Exception {
+        participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
+        participantManager.getStateMachineEngine().registerStateModelFactory(modelName, modelFactory);
+        participantManager.connect();
+    }
+
+    @Override
+    public final void stop() {
+        try {
+            log.info(String.format("stopping helix participant '%s'", instanceName));
+            stopParticipant();
+
+            log.info(String.format("stopping container '%s'", instanceName));
+            stopContainer();
+
+            active = false;
+
+        } catch (Exception e) {
+            log.warn(String.format("stopping container '%s' failed", instanceName), e);
+        }
+    }
+
+    protected abstract void stopContainer() throws Exception;
+
+    private final void stopParticipant() {
+        if (participantManager != null) {
+            participantManager.disconnect();
+        }
+    }
+
+    public final void fail() {
+        failed = true;
+    }
+
+    public final boolean isActive() {
+        return active && !failed;
+    }
+
+    public final boolean isFailed() {
+        return failed;
+    }
+
+    public final ContainerProcessProperties getProperties() {
+        return properties;
+    }
+
+    String getModelName() {
+        return modelName;
+    }
+
+    StateModelFactory<? extends StateModel> getModelFactory() {
+        return modelFactory;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcessProperties.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcessProperties.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcessProperties.java
new file mode 100644
index 0000000..1a6d272
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcessProperties.java
@@ -0,0 +1,66 @@
+package org.apache.helix.metamanager.container;
+
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for ContainerProcess. 
+ * 
+ */
+public class ContainerProcessProperties extends Properties {
+    /**
+	 * 
+	 */
+    private static final long  serialVersionUID = 5754863079470995536L;
+
+    public static final String CLUSTER          = "cluster";
+    public static final String ADDRESS          = "address";
+    public static final String NAME             = "name";
+    public static final String CONTAINER_CLASS  = "class";
+
+    public ContainerProcessProperties() {
+        // left blank
+    }
+
+    public ContainerProcessProperties(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        putAll(properties);
+    }
+	
+	public boolean isValid() {
+		return containsKey(CLUSTER) &&
+			   containsKey(NAME) &&
+			   containsKey(ADDRESS) &&
+			   containsKey(CONTAINER_CLASS);
+	}
+	
+    public String getCluster() {
+        return getProperty(CLUSTER);
+    }
+
+    public String getAddress() {
+        return getProperty(ADDRESS);
+    }
+
+    public String getName() {
+        return getProperty(NAME);
+    }
+
+    public String getContainerClass() {
+        return getProperty(CONTAINER_CLASS);
+    }
+
+    @Override
+    public synchronized Object get(Object key) {
+        Preconditions.checkState(containsKey(key));
+        return super.get(key);
+    }
+
+    @Override
+    public String getProperty(String key) {
+        Preconditions.checkState(containsKey(key));
+        return super.getProperty(key);
+    }
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModel.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModel.java
new file mode 100644
index 0000000..9ac6b5c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModel.java
@@ -0,0 +1,64 @@
+package org.apache.helix.metamanager.container;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+public class ContainerStateModel extends StateModel {
+	
+	static final Logger log = Logger.getLogger(ContainerStateModel.class);
+	
+	@Transition(from = "OFFLINE", to = "SLAVE")
+	public void offlineToSlave(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from OFFLINE to SLAVE",
+				context.getManager().getInstanceName()));
+	}
+
+	@Transition(from = "SLAVE", to = "OFFLINE")
+	public void slaveToOffline(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from SLAVE to OFFLINE",
+				context.getManager().getInstanceName()));
+	}
+
+	@Transition(from = "SLAVE", to = "MASTER")
+	public void slaveToMaster(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from SLAVE to MASTER",
+				context.getManager().getInstanceName()));
+	}
+
+	@Transition(from = "MASTER", to = "SLAVE")
+	public void masterToSlave(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from MASTER to SLAVE",
+				context.getManager().getInstanceName()));
+	}
+
+	@Transition(from = "OFFLINE", to = "DROPPED")
+	public void offlineToDropped(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+				context.getManager().getInstanceName()));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModelFactory.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModelFactory.java
new file mode 100644
index 0000000..ab5a099
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModelFactory.java
@@ -0,0 +1,30 @@
+package org.apache.helix.metamanager.container;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class ContainerStateModelFactory extends StateModelFactory<ContainerStateModel> {
+
+	@Override
+	public ContainerStateModel createNewStateModel(String partitionName) {
+		return new ContainerStateModel();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerUtils.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerUtils.java
new file mode 100644
index 0000000..3d32862
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerUtils.java
@@ -0,0 +1,46 @@
+package org.apache.helix.metamanager.container;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for loading ContainerProperties and spawning ContainerProcess.
+ * 
+ */
+public class ContainerUtils {
+
+    static final Logger log = Logger.getLogger(ContainerUtils.class);
+
+    private ContainerUtils() {
+        // left blank
+    }
+
+    public static ContainerProcess createProcess(ContainerProcessProperties properties) throws Exception {
+        String containerClassName = properties.getContainerClass();
+
+        Class<?> containerClass = Class.forName(containerClassName);
+
+        log.debug(String.format("checking for properties constructor in class '%s'", containerClassName));
+
+        Constructor<?> constructor = containerClass.getConstructor(ContainerProcessProperties.class);
+
+        return (ContainerProcess) constructor.newInstance(properties);
+    }
+
+    public static ContainerProcessProperties getPropertiesFromResource(String resourceName) throws IOException {
+        ContainerProcessProperties properties = new ContainerProcessProperties();
+        properties.load(ClassLoader.getSystemResourceAsStream(resourceName));
+        return properties;
+    }
+
+    public static ContainerProcessProperties getPropertiesFromPath(String filePath) throws IOException {
+        ContainerProcessProperties properties = new ContainerProcessProperties();
+        properties.load(new InputStreamReader(new FileInputStream(filePath)));
+        return properties;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyMasterSlaveProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyMasterSlaveProcess.java
new file mode 100644
index 0000000..d91d77c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyMasterSlaveProcess.java
@@ -0,0 +1,76 @@
+package org.apache.helix.metamanager.container.impl;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class DummyMasterSlaveProcess extends ContainerProcess {
+
+	static final Logger log = Logger.getLogger(DummyMasterSlaveProcess.class);
+	
+	public DummyMasterSlaveProcess(ContainerProcessProperties properties) {
+		super(properties);
+		setModelName("MasterSlave");
+		setModelFactory(new DummyMasterSlaveModelFactory());
+	}
+
+	@Override
+	protected void startContainer() throws Exception {
+		log.info("starting dummy process container");
+	}
+
+	@Override
+	protected void stopContainer() throws Exception {
+		log.info("stopping dummy process container");
+	}
+
+	public static class DummyMasterSlaveModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> {
+		@Override
+		public DummyMasterSlaveStateModel createNewStateModel(String partitionName) {
+			return new DummyMasterSlaveStateModel();
+		}
+	}
+	
+	@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+	public static class DummyMasterSlaveStateModel extends StateModel {
+		
+		static final Logger log = Logger.getLogger(DummyMasterSlaveStateModel.class);
+		
+		@Transition(from = "OFFLINE", to = "SLAVE")
+		public void offlineToSlave(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from OFFLINE to SLAVE",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "SLAVE", to = "OFFLINE")
+		public void slaveToOffline(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from SLAVE to OFFLINE",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "SLAVE", to = "MASTER")
+		public void slaveToMaster(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from SLAVE to MASTER",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "MASTER", to = "SLAVE")
+		public void masterToSlave(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from MASTER to SLAVE",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "OFFLINE", to = "DROPPED")
+		public void offlineToDropped(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+					context.getManager().getInstanceName()));
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyOnlineOfflineProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyOnlineOfflineProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyOnlineOfflineProcess.java
new file mode 100644
index 0000000..d5015f4
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyOnlineOfflineProcess.java
@@ -0,0 +1,64 @@
+package org.apache.helix.metamanager.container.impl;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class DummyOnlineOfflineProcess extends ContainerProcess {
+
+	static final Logger log = Logger.getLogger(DummyOnlineOfflineProcess.class);
+	
+	public DummyOnlineOfflineProcess(ContainerProcessProperties properties) {
+		super(properties);
+		setModelName("OnlineOffline");
+		setModelFactory(new DummyOnlineOfflineModelFactory());
+	}
+
+	@Override
+	protected void startContainer() throws Exception {
+		log.info("starting dummy online-offline process container");
+	}
+
+	@Override
+	protected void stopContainer() throws Exception {
+		log.info("stopping dummy online-offline process container");
+	}
+
+	public static class DummyOnlineOfflineModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel> {
+		@Override
+		public DummyOnlineOfflineStateModel createNewStateModel(String partitionName) {
+			return new DummyOnlineOfflineStateModel();
+		}
+	}
+	
+	@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+	public static class DummyOnlineOfflineStateModel extends StateModel {
+		
+		static final Logger log = Logger.getLogger(DummyOnlineOfflineStateModel.class);
+		
+		@Transition(from = "OFFLINE", to = "ONLINE")
+		public void offlineToOnline(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from OFFLINE to ONLINE",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "ONLINE", to = "OFFLINE")
+		public void onlineToOffline(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from ONLINE to OFFLINE",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "OFFLINE", to = "DROPPED")
+		public void offlineToDropped(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+					context.getManager().getInstanceName()));
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyProcess.java
new file mode 100644
index 0000000..b4963a7
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyProcess.java
@@ -0,0 +1,76 @@
+package org.apache.helix.metamanager.container.impl;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class DummyProcess extends ContainerProcess {
+
+	static final Logger log = Logger.getLogger(DummyProcess.class);
+	
+	public DummyProcess(ContainerProcessProperties properties) {
+		super(properties);
+		setModelName("MasterSlave");
+		setModelFactory(new DummyModelFactory());
+	}
+
+	@Override
+	protected void startContainer() throws Exception {
+		log.info("starting dummy process container");
+	}
+
+	@Override
+	protected void stopContainer() throws Exception {
+		log.info("stopping dummy process container");
+	}
+
+	public static class DummyModelFactory extends StateModelFactory<DummyStateModel> {
+		@Override
+		public DummyStateModel createNewStateModel(String partitionName) {
+			return new DummyStateModel();
+		}
+	}
+	
+	@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+	public static class DummyStateModel extends StateModel {
+		
+		static final Logger log = Logger.getLogger(DummyStateModel.class);
+		
+		@Transition(from = "OFFLINE", to = "SLAVE")
+		public void offlineToSlave(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from OFFLINE to SLAVE",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "SLAVE", to = "OFFLINE")
+		public void slaveToOffline(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from SLAVE to OFFLINE",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "SLAVE", to = "MASTER")
+		public void slaveToMaster(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from SLAVE to MASTER",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "MASTER", to = "SLAVE")
+		public void masterToSlave(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from MASTER to SLAVE",
+					context.getManager().getInstanceName()));
+		}
+
+		@Transition(from = "OFFLINE", to = "DROPPED")
+		public void offlineToDropped(Message m, NotificationContext context) {
+			log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+					context.getManager().getInstanceName()));
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/RedisServerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/RedisServerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/RedisServerProcess.java
new file mode 100644
index 0000000..d084a71
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/RedisServerProcess.java
@@ -0,0 +1,135 @@
+package org.apache.helix.metamanager.container.impl;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class RedisServerProcess extends ContainerProcess {
+
+    static final Logger        log                  = Logger.getLogger(RedisServerProcess.class);
+
+    public static final String REDIS_SERVER_COMMAND = "redis-server";
+
+    public static final long   MONITOR_INTERVAL     = 5000;
+
+    ZkClient                   zookeeper;
+
+    final String               address;
+    final String               root;
+    final String               name;
+    final int                  basePort;
+
+    Process                    process;
+
+    ScheduledExecutorService   executor;
+
+    public RedisServerProcess(ContainerProcessProperties properties) {
+        super(properties);
+
+        setModelName("OnlineOffline");
+        setModelFactory(new RedisServerModelFactory());
+
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        basePort = Integer.valueOf(properties.getProperty("baseport"));
+        name = properties.getProperty(ContainerProcessProperties.HELIX_INSTANCE);
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info(String.format("starting redis server container for instance '%s'", name));
+
+        String hostname = InetAddress.getLocalHost().getHostName();
+        int port = basePort + Integer.valueOf(name.split("_")[1]);
+
+        log.debug(String.format("Starting redis server at '%s:%d'", hostname, port));
+
+        ProcessBuilder builder = new ProcessBuilder();
+        builder.command(REDIS_SERVER_COMMAND, "--port", String.valueOf(port));
+        process = builder.start();
+
+        log.debug("Updating zookeeper");
+        zookeeper = new ZkClient(address);
+        zookeeper.deleteRecursive("/" + root + "/" + name);
+        zookeeper.createPersistent("/" + root + "/" + name, true);
+        zookeeper.createPersistent("/" + root + "/" + name + "/hostname", hostname);
+        zookeeper.createPersistent("/" + root + "/" + name + "/port", String.valueOf(port));
+
+        log.debug("Starting process monitor");
+        executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(new ProcessMonitor(), 0, MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping redis server container");
+
+        log.debug("Stopping process monitor");
+        executor.shutdownNow();
+
+        log.debug("Updating zookeeper");
+        zookeeper.deleteRecursive("/" + root + "/" + name);
+        zookeeper.close();
+
+        log.debug("Stopping process");
+        process.destroy();
+        process.waitFor();
+    }
+
+    public class RedisServerModelFactory extends StateModelFactory<RedisServerModel> {
+        @Override
+        public RedisServerModel createNewStateModel(String partitionName) {
+            return new RedisServerModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+    public class RedisServerModel extends StateModel {
+
+        @Transition(from = "OFFLINE", to = "ONLINE")
+        public void offlineToSlave(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+        @Transition(from = "ONLINE", to = "OFFLINE")
+        public void slaveToOffline(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+    }
+
+    private class ProcessMonitor implements Runnable {
+        @Override
+        public void run() {
+            try {
+                process.exitValue();
+                log.warn("detected process failure");
+                fail();
+            } catch (Exception e) {
+                // expected
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/ZookeeperMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/ZookeeperMasterSlaveProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/ZookeeperMasterSlaveProcess.java
new file mode 100644
index 0000000..f8bbc85
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/ZookeeperMasterSlaveProcess.java
@@ -0,0 +1,104 @@
+package org.apache.helix.metamanager.container.impl;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class ZookeeperMasterSlaveProcess extends ContainerProcess {
+
+    static final Logger log = Logger.getLogger(ZookeeperMasterSlaveProcess.class);
+
+    ZkClient            zookeeper;
+
+    final String        address;
+    final String        root;
+    final String        name;
+
+    public ZookeeperMasterSlaveProcess(ContainerProcessProperties properties) {
+        super(properties);
+
+        setModelName("MasterSlave");
+        setModelFactory(new ZookeeperMasterSlaveModelFactory());
+
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        name = properties.getProperty(ContainerProcessProperties.HELIX_INSTANCE);
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info("starting zookeeper process container");
+
+        zookeeper = new ZkClient(address);
+        zookeeper.createPersistent("/" + root + "/" + name, true);
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping zookeeper process container");
+
+        zookeeper.close();
+    }
+
+    public class ZookeeperMasterSlaveModelFactory extends StateModelFactory<ZookeeperMasterSlaveModel> {
+        @Override
+        public ZookeeperMasterSlaveModel createNewStateModel(String partitionName) {
+            return new ZookeeperMasterSlaveModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+    public class ZookeeperMasterSlaveModel extends StateModel {
+
+        @Transition(from = "OFFLINE", to = "SLAVE")
+        public void offlineToSlave(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "SLAVE", to = "OFFLINE")
+        public void slaveToOffline(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "SLAVE", to = "MASTER")
+        public void slaveToMaster(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "MASTER", to = "SLAVE")
+        public void masterToSlave(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+            String resource = m.getResourceName();
+            String partition = m.getPartitionName();
+            String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+            zookeeper.delete(path);
+        }
+
+        public void transition(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+            String resource = m.getResourceName();
+            String partition = m.getPartitionName();
+            String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+            zookeeper.delete(path);
+            zookeeper.createEphemeral(path, m.getToState());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/FileTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/FileTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/FileTargetProvider.java
new file mode 100644
index 0000000..6eac3e8
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/FileTargetProvider.java
@@ -0,0 +1,51 @@
+package org.apache.helix.metamanager.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.helix.metamanager.TargetProviderService;
+
+/**
+ * File-based target model. Container count is extracted from properties file. Count may change dynamically.
+ * 
+ */
+public class FileTargetProvider implements TargetProviderService {
+
+    File file;
+
+    public FileTargetProvider() {
+        // left blank
+    }
+
+    public FileTargetProvider(String path) {
+        this.file = new File(path);
+    }
+
+    @Override
+    public int getTargetContainerCount(String containerType) throws FileNotFoundException, IOException, IllegalArgumentException {
+        Properties properties = new Properties();
+        properties.load(new FileReader(file));
+        if (!properties.contains(containerType))
+            throw new IllegalArgumentException(String.format("container type '%s' not found in '%s'", containerType, file.getCanonicalPath()));
+        return Integer.parseInt((String) properties.get(containerType));
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        this.file = new File(properties.getProperty("path"));
+    }
+
+    @Override
+    public void start() throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // left blank
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/RedisTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/RedisTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/RedisTargetProvider.java
new file mode 100644
index 0000000..1fdf96e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/RedisTargetProvider.java
@@ -0,0 +1,356 @@
+package org.apache.helix.metamanager.impl;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.metamanager.TargetProviderService;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Redis-specific target model based on recurring Tps benchmarking. Tps target
+ * and probed redis-server instances are configured via zookeeper. Tps target
+ * may change dynamically.
+ * 
+ */
+public class RedisTargetProvider implements TargetProviderService {
+
+    static final Logger        log               = Logger.getLogger(RedisTargetProvider.class);
+
+    public static final String BENCHMARK_COMMAND = "redis-benchmark";
+    public static final String BENCHMARK_TESTS   = "GET,SET";
+
+    public static final String DEFAULT_RECORDS   = "100000";
+    public static final String DEFAULT_CLIENTS   = "20";
+    public static final String DEFAULT_REQUESTS  = "100000";
+    public static final String DEFAULT_TIMEOUT   = "8000";
+    public static final String DEFAULT_INTERVAL  = "10000";
+    public static final String DEFAULT_ALPHA     = "0.25";
+
+    ZkClient                   zookeeper;
+
+    String                     address;
+    String                     root;
+
+    int                        records;
+    int                        clients;
+    int                        requests;
+    int                        timeout;
+    int                        interval;
+
+    int                        targetTpsGet;
+    int                        targetTpsSet;
+
+    int                        targetCountMin;
+    int                        targetCountMax;
+    int                        targetCount;
+
+    double                     alpha;
+    double                     averageTpsGet;
+    double                     averageTpsSet;
+    double                     averageCount;
+
+    ScheduledExecutorService   executor;
+
+    @Override
+    public void configure(Properties properties) {
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        targetTpsGet = Integer.valueOf(properties.getProperty("get", "0"));
+        targetTpsSet = Integer.valueOf(properties.getProperty("set", "0"));
+        targetCountMin = Integer.valueOf(properties.getProperty("min", "-1"));
+        targetCountMax = Integer.valueOf(properties.getProperty("max", "-1"));
+        records = Integer.valueOf(properties.getProperty("records", DEFAULT_RECORDS));
+        clients = Integer.valueOf(properties.getProperty("clients", DEFAULT_CLIENTS));
+        requests = Integer.valueOf(properties.getProperty("requests", DEFAULT_REQUESTS));
+        timeout = Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT));
+        interval = Integer.valueOf(properties.getProperty("interval", DEFAULT_INTERVAL));
+        alpha = Double.valueOf(properties.getProperty("alpha", DEFAULT_ALPHA));
+    }
+
+    @Override
+    public void start() {
+        log.debug("starting redis status service");
+        zookeeper = new ZkClient(address);
+        zookeeper.createPersistent("/" + root, true);
+
+        try { zookeeper.createPersistent("/" + root + "/target.get", String.valueOf(targetTpsGet)); } catch (Exception ignore) {}
+        try { zookeeper.createPersistent("/" + root + "/target.set", String.valueOf(targetTpsSet)); } catch (Exception ignore) {}
+        try { zookeeper.createPersistent("/" + root + "/target.min", String.valueOf(targetCountMin)); } catch (Exception ignore) {}
+        try { zookeeper.createPersistent("/" + root + "/target.max", String.valueOf(targetCountMax)); } catch (Exception ignore) {}
+ 
+        executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(new RedisBenchmarkRunnable(), 0, interval, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void stop() {
+        log.debug("stopping redis status service");
+        if (executor != null) {
+            executor.shutdownNow();
+            while (!executor.isTerminated()) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            executor = null;
+        }
+        if (zookeeper != null) {
+            zookeeper.close();
+            zookeeper = null;
+        }
+    }
+
+    @Override
+    public int getTargetContainerCount(String containerType) throws Exception {
+        return targetCount;
+    }
+
+    private class RedisBenchmarkRunnable implements Runnable {
+        ExecutorService executor = Executors.newCachedThreadPool();
+        RedisResult     aggregateResult;
+
+        @Override
+        public void run() {
+            log.debug("running redis benchmark");
+
+            aggregateResult = new RedisResult(0);
+            Collection<Future<RedisResult>> futures = new ArrayList<Future<RedisResult>>();
+
+            try {
+                Collection<RedisTarget> targets = getTargets();
+
+                // start benchmark
+                for (RedisTarget target : targets) {
+                    log.debug(String.format("submitting target '%s'", target));
+                    Future<RedisResult> future = executor.submit(new RedisCallable(target));
+                    futures.add(future);
+                }
+
+                // aggregate results
+                try {
+                    log.debug("waiting for results");
+
+                    long limit = System.currentTimeMillis() + timeout;
+                    for (Future<RedisResult> future : futures) {
+                        try {
+                            RedisResult result = future.get(limit - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+                            log.debug(String.format("got result '%s'", result));
+                            aggregate(result);
+                        } catch (Exception e) {
+                            log.warn(String.format("failed to get result"));
+                            future.cancel(true);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error running redis benchmark", e);
+
+                    for (Future<RedisResult> future : futures) {
+                        future.cancel(true);
+                    }
+
+                    return;
+                }
+
+                // compare to thresholds
+                log.debug(String.format("aggregate result is '%s'", aggregateResult));
+
+                // get target from zookeeper
+                try { targetTpsGet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.get")); } catch (Exception ignore) {}
+                try { targetTpsSet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.set")); } catch (Exception ignore) {}
+                try { targetCountMin = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.min")); } catch (Exception ignore) {}
+                try { targetCountMax = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.max")); } catch (Exception ignore) {}
+
+                averageCount = alpha * aggregateResult.serverCount + (1.0 - alpha) * averageCount;
+
+                // calculate counts
+                int targetCountGet = -1;
+                if (aggregateResult.containsKey("GET")) {
+                    double tpsTarget = targetTpsGet;
+                    double tps = aggregateResult.get("GET");
+
+                    averageTpsGet = alpha * tps + (1.0 - alpha) * averageTpsGet;
+
+                    targetCountGet = (int) Math.ceil(tpsTarget / averageTpsGet * averageCount);
+                    log.debug(String.format("count.get=%d, target.get=%f, tps.get=%f, tps.avg.get=%f, count.avg=%f", targetCountGet, tpsTarget, tps,
+                            averageTpsGet, averageCount));
+                }
+
+                int targetCountSet = -1;
+                if (aggregateResult.containsKey("SET")) {
+                    double tpsTarget = targetTpsSet;
+                    double tps = aggregateResult.get("SET");
+
+                    averageTpsSet = alpha * tps + (1.0 - alpha) * averageTpsSet;
+
+                    targetCountSet = (int) Math.ceil(tpsTarget / averageTpsSet * averageCount);
+                    log.debug(String.format("count.set=%d, target.set=%f, tps.set=%f, tps.avg.set=%f, count.avg=%f", targetCountSet, tpsTarget, tps,
+                            averageTpsSet, averageCount));
+                }
+
+                targetCount = Math.max(targetCountGet, targetCountSet);
+
+                if (targetCountMin > 0)
+                    targetCount = Math.max(targetCount, targetCountMin);
+                if (targetCountMax > 0)
+                    targetCount = Math.min(targetCount, targetCountMax);
+
+                targetCount = Math.max(targetCount, 1);
+
+                log.debug(String.format("target count is %d", targetCount));
+                RedisTargetProvider.this.targetCount = targetCount;
+
+            } catch (Exception e) {
+                log.error("Error running redis benchmark", e);
+
+                for (Future<RedisResult> future : futures) {
+                    future.cancel(true);
+                }
+            }
+
+        }
+
+        Collection<RedisTarget> getTargets() {
+            log.debug("fetching redis servers from zookeeper");
+            Collection<RedisTarget> targets = new ArrayList<RedisTarget>();
+            Collection<String> servers = zookeeper.getChildren("/" + root);
+
+            servers.remove("target.get");
+            servers.remove("target.set");
+            servers.remove("target.min");
+            servers.remove("target.max");
+
+            for (String server : servers) {
+                if (!zookeeper.exists("/" + root + "/" + server + "/heartbeat"))
+                    continue;
+
+                String hostname = zookeeper.readData("/" + root + "/" + server + "/hostname");
+                int port = Integer.valueOf(zookeeper.<String> readData("/" + root + "/" + server + "/port"));
+
+                targets.add(new RedisTarget(hostname, port));
+            }
+
+            log.debug(String.format("found %d servers: %s", targets.size(), targets));
+            return targets;
+        }
+
+        void aggregate(RedisResult result) {
+            RedisResult newResult = new RedisResult(aggregateResult.serverCount + result.serverCount);
+
+            for (Entry<String, Double> entry : result.entrySet()) {
+                double current = 0.0d;
+                if (aggregateResult.containsKey(entry.getKey()))
+                    current = aggregateResult.get(entry.getKey());
+
+                current += entry.getValue();
+                newResult.put(entry.getKey(), current);
+            }
+
+            aggregateResult = newResult;
+        }
+    }
+
+    private static class RedisTarget {
+        final String hostname;
+        final int    port;
+
+        public RedisTarget(String hostname, int port) {
+            this.hostname = hostname;
+            this.port = port;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s:%d", hostname, port);
+        }
+    }
+
+    private static class RedisResult extends HashMap<String, Double> {
+        /**
+         * 
+         */
+        private static final long serialVersionUID = 4599748807597500952L;
+
+        final int                 serverCount;
+
+        public RedisResult(int serverCount) {
+            this.serverCount = serverCount;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("[serverCount=%d %s]", serverCount, super.toString());
+        }
+    }
+
+    private class RedisCallable implements Callable<RedisResult> {
+        final RedisTarget target;
+
+        public RedisCallable(RedisTarget target) {
+            this.target = target;
+        }
+
+        @Override
+        public RedisResult call() throws Exception {
+            log.debug(String.format("executing benchmark for '%s'", target));
+
+            ProcessBuilder builder = new ProcessBuilder();
+            builder.command(BENCHMARK_COMMAND, "-h", target.hostname, "-p", String.valueOf(target.port), "-r", String.valueOf(records), "-n",
+                    String.valueOf(requests), "-c", String.valueOf(clients), "-t", BENCHMARK_TESTS, "--csv");
+            Process process = builder.start();
+
+            log.debug(String.format("running '%s'", builder.command()));
+
+            RedisResult result = new RedisResult(1);
+
+            int retVal;
+            try {
+                retVal = process.waitFor();
+            } catch (InterruptedException e) {
+                process.destroy();
+                return result;
+            }
+
+            Preconditions.checkState(retVal == 0, "Benchmark process returned %s", retVal);
+
+            Pattern pattern = Pattern.compile("\"([A-Z0-9_]+).*\",\"([0-9\\.]+)\"");
+
+            log.debug("parsing output");
+            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            String line = null;
+            while ((line = reader.readLine()) != null) {
+                Matcher matcher = pattern.matcher(line);
+
+                if (!matcher.find())
+                    continue;
+
+                String key = matcher.group(1);
+                Double value = Double.valueOf(matcher.group(2));
+
+                result.put(key, value);
+            }
+
+            log.debug(String.format("benchmark for '%s' returned '%s'", target, result));
+
+            return result;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/StaticTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/StaticTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/StaticTargetProvider.java
new file mode 100644
index 0000000..3159fbe
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/StaticTargetProvider.java
@@ -0,0 +1,62 @@
+package org.apache.helix.metamanager.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.helix.metamanager.TargetProviderService;
+import org.apache.log4j.Logger;
+
+/**
+ * Target model based on manually set count. Count may change dynamically.
+ * 
+ */
+public class StaticTargetProvider implements TargetProviderService {
+    static final Logger        log          = Logger.getLogger(StaticTargetProvider.class);
+
+    final Map<String, Integer> targetCounts = new HashMap<String, Integer>();
+
+    public StaticTargetProvider() {
+        // left blank
+    }
+
+    public StaticTargetProvider(Map<String, Integer> targetCounts) {
+        this.targetCounts.putAll(targetCounts);
+    }
+
+    @Override
+    public int getTargetContainerCount(String containerType) {
+        return targetCounts.get(containerType);
+    }
+
+    public void setTargetContainerCount(String containerType, int targetCount) {
+        targetCounts.put(containerType, targetCount);
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        for (Entry<Object, Object> entry : properties.entrySet()) {
+            String key = (String) entry.getKey();
+
+            try {
+                int value = Integer.valueOf((String) entry.getValue());
+                log.debug(String.format("Inserting value '%s = %d'", key, value));
+                targetCounts.put(key, value);
+            } catch (NumberFormatException e) {
+                log.warn(String.format("Skipping '%s', not an integer (value='%s')", key, (String) entry.getValue()));
+            }
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // left blank
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyMasterSlaveProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyMasterSlaveProcess.java
new file mode 100644
index 0000000..2d91bdd
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyMasterSlaveProcess.java
@@ -0,0 +1,76 @@
+package org.apache.helix.metamanager.impl.container;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for MasterSlave
+ * state model. Print state transitions only.
+ * 
+ */
+public class DummyMasterSlaveProcess extends ContainerProcess {
+
+    static final Logger log = Logger.getLogger(DummyMasterSlaveProcess.class);
+
+    public DummyMasterSlaveProcess(ContainerProcessProperties properties) throws Exception {
+        configure(properties);
+        setModelName("MasterSlave");
+        setModelFactory(new DummyMasterSlaveModelFactory());
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info("starting dummy process container");
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping dummy process container");
+    }
+
+    public static class DummyMasterSlaveModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> {
+        @Override
+        public DummyMasterSlaveStateModel createNewStateModel(String partitionName) {
+            return new DummyMasterSlaveStateModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+    public static class DummyMasterSlaveStateModel extends StateModel {
+
+        static final Logger log = Logger.getLogger(DummyMasterSlaveStateModel.class);
+
+        @Transition(from = "OFFLINE", to = "SLAVE")
+        public void offlineToSlave(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from OFFLINE to SLAVE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "SLAVE", to = "OFFLINE")
+        public void slaveToOffline(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from SLAVE to OFFLINE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "SLAVE", to = "MASTER")
+        public void slaveToMaster(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from SLAVE to MASTER", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "MASTER", to = "SLAVE")
+        public void masterToSlave(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from MASTER to SLAVE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from OFFLINE to DROPPED", context.getManager().getInstanceName()));
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyOnlineOfflineProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyOnlineOfflineProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyOnlineOfflineProcess.java
new file mode 100644
index 0000000..62f63a8
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyOnlineOfflineProcess.java
@@ -0,0 +1,66 @@
+package org.apache.helix.metamanager.impl.container;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for OnlineOffline
+ * state model. Print state transitions only.
+ * 
+ */
+public class DummyOnlineOfflineProcess extends ContainerProcess {
+
+    static final Logger log = Logger.getLogger(DummyOnlineOfflineProcess.class);
+
+    public DummyOnlineOfflineProcess(ContainerProcessProperties properties) throws Exception {
+        configure(properties);
+        setModelName("OnlineOffline");
+        setModelFactory(new DummyOnlineOfflineModelFactory());
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info("starting dummy online-offline process container");
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping dummy online-offline process container");
+    }
+
+    public static class DummyOnlineOfflineModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel> {
+        @Override
+        public DummyOnlineOfflineStateModel createNewStateModel(String partitionName) {
+            return new DummyOnlineOfflineStateModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+    public static class DummyOnlineOfflineStateModel extends StateModel {
+
+        static final Logger log = Logger.getLogger(DummyOnlineOfflineStateModel.class);
+
+        @Transition(from = "OFFLINE", to = "ONLINE")
+        public void offlineToOnline(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from OFFLINE to ONLINE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "ONLINE", to = "OFFLINE")
+        public void onlineToOffline(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from ONLINE to OFFLINE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from OFFLINE to DROPPED", context.getManager().getInstanceName()));
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/RedisServerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/RedisServerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/RedisServerProcess.java
new file mode 100644
index 0000000..c87f905
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/RedisServerProcess.java
@@ -0,0 +1,140 @@
+package org.apache.helix.metamanager.impl.container;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Container implementation for redis-server. Uses OnlineOffline model, spawns
+ * Redis as Shell process and writes metadata to zookeeper.
+ * 
+ */
+public class RedisServerProcess extends ContainerProcess {
+
+    static final Logger        log                  = Logger.getLogger(RedisServerProcess.class);
+
+    public static final String REDIS_SERVER_COMMAND = "redis-server";
+
+    public static final long   MONITOR_INTERVAL     = 5000;
+
+    ZkClient                   zookeeper;
+
+    final String               address;
+    final String               root;
+    final String               name;
+    final int                  basePort;
+
+    Process                    process;
+
+    ScheduledExecutorService   executor;
+
+    public RedisServerProcess(ContainerProcessProperties properties) throws Exception {
+        configure(properties);
+        setModelName("OnlineOffline");
+        setModelFactory(new RedisServerModelFactory());
+
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        basePort = Integer.valueOf(properties.getProperty("baseport"));
+        name = properties.getProperty(ContainerProcessProperties.NAME);
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info(String.format("starting redis server container for instance '%s'", name));
+
+        String hostname = InetAddress.getLocalHost().getHostName();
+        int port = basePort + Integer.valueOf(name.split("_")[1]);
+
+        log.debug(String.format("Starting redis server at '%s:%d'", hostname, port));
+
+        ProcessBuilder builder = new ProcessBuilder();
+        builder.command(REDIS_SERVER_COMMAND, "--port", String.valueOf(port));
+        process = builder.start();
+
+        log.debug("Updating zookeeper");
+        zookeeper = new ZkClient(address);
+        zookeeper.deleteRecursive("/" + root + "/" + name);
+        zookeeper.createPersistent("/" + root + "/" + name, true);
+        zookeeper.createPersistent("/" + root + "/" + name + "/hostname", hostname);
+        zookeeper.createPersistent("/" + root + "/" + name + "/port", String.valueOf(port));
+        zookeeper.createEphemeral("/" + root + "/" + name + "/heartbeat");
+
+        log.debug("Starting process monitor");
+        executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(new ProcessMonitor(), 0, MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping redis server container");
+
+        log.debug("Stopping process monitor");
+        executor.shutdownNow();
+
+        log.debug("Updating zookeeper");
+        zookeeper.deleteRecursive("/" + root + "/" + name);
+        zookeeper.close();
+
+        log.debug("Stopping process");
+        process.destroy();
+        process.waitFor();
+    }
+
+    public class RedisServerModelFactory extends StateModelFactory<RedisServerModel> {
+        @Override
+        public RedisServerModel createNewStateModel(String partitionName) {
+            return new RedisServerModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+    public class RedisServerModel extends StateModel {
+
+        @Transition(from = "OFFLINE", to = "ONLINE")
+        public void offlineToSlave(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+        @Transition(from = "ONLINE", to = "OFFLINE")
+        public void slaveToOffline(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+    }
+
+    private class ProcessMonitor implements Runnable {
+        @Override
+        public void run() {
+            try {
+                process.exitValue();
+                log.warn("detected process failure");
+                fail();
+            } catch (Exception e) {
+                // expected
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/ZookeeperMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/ZookeeperMasterSlaveProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/ZookeeperMasterSlaveProcess.java
new file mode 100644
index 0000000..a493a71
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/ZookeeperMasterSlaveProcess.java
@@ -0,0 +1,108 @@
+package org.apache.helix.metamanager.impl.container;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for MasterSlave
+ * state model. Writes current state to separate zookeeper domain.
+ * 
+ */
+public class ZookeeperMasterSlaveProcess extends ContainerProcess {
+
+    static final Logger log = Logger.getLogger(ZookeeperMasterSlaveProcess.class);
+
+    ZkClient            zookeeper;
+
+    final String        address;
+    final String        root;
+    final String        name;
+
+    public ZookeeperMasterSlaveProcess(ContainerProcessProperties properties) throws Exception {
+        configure(properties);
+        setModelName("MasterSlave");
+        setModelFactory(new ZookeeperMasterSlaveModelFactory());
+
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        name = properties.getProperty(ContainerProcessProperties.NAME);
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info("starting zookeeper process container");
+
+        zookeeper = new ZkClient(address);
+        zookeeper.createPersistent("/" + root + "/" + name, true);
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping zookeeper process container");
+
+        zookeeper.close();
+    }
+
+    public class ZookeeperMasterSlaveModelFactory extends StateModelFactory<ZookeeperMasterSlaveModel> {
+        @Override
+        public ZookeeperMasterSlaveModel createNewStateModel(String partitionName) {
+            return new ZookeeperMasterSlaveModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+    public class ZookeeperMasterSlaveModel extends StateModel {
+
+        @Transition(from = "OFFLINE", to = "SLAVE")
+        public void offlineToSlave(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "SLAVE", to = "OFFLINE")
+        public void slaveToOffline(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "SLAVE", to = "MASTER")
+        public void slaveToMaster(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "MASTER", to = "SLAVE")
+        public void masterToSlave(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+            String resource = m.getResourceName();
+            String partition = m.getPartitionName();
+            String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+            zookeeper.delete(path);
+        }
+
+        public void transition(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+            String resource = m.getResourceName();
+            String partition = m.getPartitionName();
+            String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+            zookeeper.delete(path);
+            zookeeper.createEphemeral(path, m.getToState());
+        }
+
+    }
+
+}


Mime
View raw message