helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swaroop...@apache.org
Subject [1/2] git commit: HELIX-16: Recipe for distributed task execution
Date Sat, 06 Apr 2013 00:31:20 GMT
Updated Branches:
  refs/heads/master 6ffeb30b6 -> f3e325510


HELIX-16: Recipe for distributed task execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/ec8000e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/ec8000e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/ec8000e1

Branch: refs/heads/master
Commit: ec8000e1d3e93e2533afebcd83938b4528fb00c9
Parents: 567fa30
Author: Swaroop Jagadish <sjagadis@sjagadis-ld.linkedin.biz>
Authored: Fri Apr 5 17:26:07 2013 -0700
Committer: Swaroop Jagadish <sjagadis@sjagadis-ld.linkedin.biz>
Committed: Fri Apr 5 17:26:07 2013 -0700

----------------------------------------------------------------------
 recipes/task-execution/README.md                   |  253 +++++++++++++++
 recipes/task-execution/pom.xml                     |  129 ++++++++
 .../src/main/config/log4j.properties               |   31 ++
 .../helix/taskexecution/AnalyticsTaskFactory.java  |   59 ++++
 .../org/apache/helix/taskexecution/CountTask.java  |   76 +++++
 .../java/org/apache/helix/taskexecution/Dag.java   |  125 +++++++
 .../org/apache/helix/taskexecution/FilterTask.java |   61 ++++
 .../org/apache/helix/taskexecution/JoinTask.java   |   74 +++++
 .../helix/taskexecution/RedisTaskResultStore.java  |  107 ++++++
 .../org/apache/helix/taskexecution/ReportTask.java |   57 ++++
 .../java/org/apache/helix/taskexecution/Task.java  |  128 ++++++++
 .../apache/helix/taskexecution/TaskCluster.java    |   86 +++++
 .../helix/taskexecution/TaskExecutionDemo.java     |  159 +++++++++
 .../apache/helix/taskexecution/TaskFactory.java    |   28 ++
 .../helix/taskexecution/TaskResultStore.java       |   39 +++
 .../apache/helix/taskexecution/TaskStateModel.java |  101 ++++++
 .../helix/taskexecution/TaskStateModelFactory.java |   40 +++
 .../org/apache/helix/taskexecution/Worker.java     |  131 ++++++++
 18 files changed, 1684 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/README.md
----------------------------------------------------------------------
diff --git a/recipes/task-execution/README.md b/recipes/task-execution/README.md
new file mode 100644
index 0000000..fdba382
--- /dev/null
+++ b/recipes/task-execution/README.md
@@ -0,0 +1,253 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+Distributed lock manager
+------------------------
+Distributed locks are used to synchronize accesses shared resources. Most applications use Zookeeper to model the distributed locks. 
+
+The simplest way to model a lock using zookeeper is (See Zookeeper leader recipe for an exact and more advanced solution)
+
+* Each process tries to create an emphemeral node.
+* If can successfully create it then, it acquires the lock
+* Else it will watch on the znode and try to acquire the lock again if the current lock holder disappears 
+
+This is good enough if there is only one lock. But in practice, an application will need many such locks. Distributing and managing the locks among difference process becomes challenging. Extending such a solution to many locks will result in
+
+* Uneven distribution of locks among nodes, the node that starts first will acquire all the lock. Nodes that start later will be idle.
+* When a node fails, how the locks will be distributed among remaining nodes is not predicable. 
+* When new nodes are added the current nodes dont relinquish the locks so that new nodes can acquire some locks
+
+In other words we want a system to satisfy the following requirements.
+
+* Distribute locks evenly among all nodes to get better hardware utilization
+* If a node fails, the locks that were acquired by that node should be evenly distributed among other nodes
+* If nodes are added, locks must be evenly re-distributed among nodes.
+
+Helix provides a simple and elegant solution to this problem. Simply specify the number of locks and Helix will ensure that above constraints are satisfied. 
+
+To quickly see this working run the lock-manager-demo script where 12 locks are evenly distributed among three nodes, and when a node fails, the locks get re-distributed among remaining two nodes. Note that Helix does not re-shuffle the locks completely, instead it simply distributes the locks relinquished by dead node among 2 remaining nodes evenly.
+
+----------------------------------------------------------------------------------------
+
+#### Short version
+ This version starts multiple threads with in same process to simulate a multi node deployment. Try the long version to get a better idea of how it works.
+ 
+```
+git clone https://git-wip-us.apache.org/repos/asf/incubator-helix.git
+cd incubator-helix
+mvn clean install package -DskipTests
+cd recipes/distributed-lock-manager/target/distributed-lock-manager-pkg/bin
+chmod +x *
+./lock-manager-demo
+```
+
+##### Output
+
+```
+./lock-manager-demo 
+STARTING localhost_12000
+STARTING localhost_12002
+STARTING localhost_12001
+STARTED localhost_12000
+STARTED localhost_12002
+STARTED localhost_12001
+localhost_12001 acquired lock:lock-group_3
+localhost_12000 acquired lock:lock-group_8
+localhost_12001 acquired lock:lock-group_2
+localhost_12001 acquired lock:lock-group_4
+localhost_12002 acquired lock:lock-group_1
+localhost_12002 acquired lock:lock-group_10
+localhost_12000 acquired lock:lock-group_7
+localhost_12001 acquired lock:lock-group_5
+localhost_12002 acquired lock:lock-group_11
+localhost_12000 acquired lock:lock-group_6
+localhost_12002 acquired lock:lock-group_0
+localhost_12000 acquired lock:lock-group_9
+lockName    acquired By
+======================================
+lock-group_0    localhost_12002
+lock-group_1    localhost_12002
+lock-group_10    localhost_12002
+lock-group_11    localhost_12002
+lock-group_2    localhost_12001
+lock-group_3    localhost_12001
+lock-group_4    localhost_12001
+lock-group_5    localhost_12001
+lock-group_6    localhost_12000
+lock-group_7    localhost_12000
+lock-group_8    localhost_12000
+lock-group_9    localhost_12000
+Stopping localhost_12000
+localhost_12000Interrupted
+localhost_12001 acquired lock:lock-group_9
+localhost_12001 acquired lock:lock-group_8
+localhost_12002 acquired lock:lock-group_6
+localhost_12002 acquired lock:lock-group_7
+lockName    acquired By
+======================================
+lock-group_0    localhost_12002
+lock-group_1    localhost_12002
+lock-group_10    localhost_12002
+lock-group_11    localhost_12002
+lock-group_2    localhost_12001
+lock-group_3    localhost_12001
+lock-group_4    localhost_12001
+lock-group_5    localhost_12001
+lock-group_6    localhost_12002
+lock-group_7    localhost_12002
+lock-group_8    localhost_12001
+lock-group_9    localhost_12001
+
+```
+
+----------------------------------------------------------------------------------------
+
+#### Long version
+This provides more details on how to setup the cluster and where to plugin application code.
+
+##### start zookeeper
+
+```
+./start-standalone-zookeeper 2199
+```
+
+##### Create a cluster
+
+```
+./helix-admin --zkSvr localhost:2199 --addCluster lock-manager-demo
+```
+
+##### Create a lock group
+
+Create a lock group and specify the number of locks in the lock group. 
+
+```
+./helix-admin --zkSvr localhost:2199  --addResource lock-manager-demo lock-group 6 OnlineOffline AUTO_REBALANCE
+```
+
+##### Start the nodes
+
+Create a Lock class that handles the callbacks. 
+
+```
+
+public class Lock extends StateModel
+{
+  private String lockName;
+
+  public Lock(String lockName)
+  {
+    this.lockName = lockName;
+  }
+
+  public void lock(Message m, NotificationContext context)
+  {
+    System.out.println(" acquired lock:"+ lockName );
+  }
+
+  public void release(Message m, NotificationContext context)
+  {
+    System.out.println(" releasing lock:"+ lockName );
+  }
+
+}
+
+```
+
+LockFactory that creates the lock
+ 
+```
+public class LockFactory extends StateModelFactory<Lock>{
+    
+    /* Instantiates the lock handler, one per lockName*/
+    public Lock create(String lockName)
+    {
+        return new Lock(lockName);
+    }   
+}
+```
+
+At node start up, simply join the cluster and helix will invoke the appropriate call backs on Lock instance. One can start any number of nodes and Helix detects that a new node has joined the cluster and re-distributes the locks automatically.
+
+```
+public class LockProcess{
+
+  public static void main(String args){
+    String zkAddress= "localhost:2199";
+    String clusterName = "lock-manager-demo";
+    //Give a unique id to each process, most commonly used format hostname_port
+    String instanceName ="localhost_12000";
+    ZKHelixAdmin helixAdmin = new ZKHelixAdmin(zkAddress);
+    //configure the instance and provide some metadata 
+    InstanceConfig config = new InstanceConfig(instanceName);
+    config.setHostName("localhost");
+    config.setPort("12000");
+    admin.addInstance(clusterName, config);
+    //join the cluster
+    HelixManager manager;
+    manager = HelixManagerFactory.getHelixManager(clusterName,
+                                                  instanceName,
+                                                  InstanceType.PARTICIPANT,
+                                                  zkAddress);
+    manager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", modelFactory);
+    manager.connect();
+    Thread.currentThread.join();
+    }
+
+}
+```
+
+##### Start the controller
+
+Controller can be started either as a separate process or can be embedded within each node process
+
+###### Separate process
+This is recommended when number of nodes in the cluster >100. For fault tolerance, you can run multiple controllers on different boxes.
+
+```
+./run-helix-controller --zkSvr localhost:2199 --cluster mycluster 2>&1 > /tmp/controller.log &
+```
+
+###### Embedded within the node process
+This is recommended when the number of nodes in the cluster is less than 100. To start a controller from each process, simply add the following lines to MyClass
+
+```
+public class LockProcess{
+
+  public static void main(String args){
+    String zkAddress= "localhost:2199";
+    String clusterName = "lock-manager-demo";
+    .
+    .
+    manager.connect();
+    HelixManager controller;
+    controller = HelixControllerMain.startHelixController(zkAddress, 
+                                                          clusterName,
+                                                          "controller", 
+                                                          HelixControllerMain.STANDALONE);
+    Thread.currentThread.join();
+  }
+}
+```
+
+----------------------------------------------------------------------------------------
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/task-execution/pom.xml b/recipes/task-execution/pom.xml
new file mode 100644
index 0000000..f604c42
--- /dev/null
+++ b/recipes/task-execution/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.helix.recipes</groupId>
+    <artifactId>recipes</artifactId>
+    <version>0.6.1-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>task-execution-manager</artifactId>
+  <packaging>jar</packaging>
+  <version>1.1-SNAPSHOT</version>
+  <name>Apache Helix :: Recipes :: distributed task execution</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+      <version>0.6.1-incubating-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.mail</groupId>
+          <artifactId>mail</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>redis.clients</groupId>
+      <artifactId>jedis</artifactId>
+      <version>2.1.0</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>appassembler-maven-plugin</artifactId>
+          <configuration>
+            <!-- Set the target configuration directory to be used in the bin scripts -->
+            <!-- <configurationDirectory>conf</configurationDirectory> -->
+            <!-- Copy the contents from "/src/main/config" to the target configuration
+              directory in the assembled application -->
+            <!-- <copyConfigurationDirectory>true</copyConfigurationDirectory> -->
+            <!-- Include the target configuration directory in the beginning of
+              the classpath declaration in the bin scripts -->
+            <includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
+            <assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
+            <!-- Extra JVM arguments that will be included in the bin scripts -->
+            <extraJvmArguments>-Xms512m -Xmx512m</extraJvmArguments>
+            <!-- Generate bin scripts for windows and unix pr default -->
+            <platforms>
+              <platform>windows</platform>
+              <platform>unix</platform>
+            </platforms>
+          </configuration>
+          <executions>
+            <execution>
+              <phase>package</phase>
+              <goals>
+                <goal>assemble</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.rat</groupId>
+          <artifactId>apache-rat-plugin</artifactId>
+          <version>0.8</version>
+            <configuration>
+              <excludes combine.children="append">
+              </excludes>
+            </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <configuration>
+          <programs>
+            <program>
+              <mainClass>org.apache.helix.taskexecution.TaskExecutionDemo</mainClass>
+              <name>task-exection-demo</name>
+            </program>
+          </programs>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/config/log4j.properties b/recipes/task-execution/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/recipes/task-execution/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/AnalyticsTaskFactory.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/AnalyticsTaskFactory.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/AnalyticsTaskFactory.java
new file mode 100644
index 0000000..d684043
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/AnalyticsTaskFactory.java
@@ -0,0 +1,59 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+
+public class AnalyticsTaskFactory implements TaskFactory {
+
+	@Override
+	public Task createTask(String id, Set<String> parentIds,
+			HelixManager helixManager, TaskResultStore taskResultStore) {
+		if(id.equalsIgnoreCase("filterImps")) {
+			return new FilterTask(id, parentIds, helixManager, taskResultStore, FilterTask.IMPRESSIONS);
+		}
+		else if(id.equalsIgnoreCase("filterClicks")) {
+			return new FilterTask(id, parentIds, helixManager, taskResultStore, FilterTask.CLICKS);
+		}
+		else if(id.equalsIgnoreCase("impClickJoin")) {
+			return new JoinTask(id, parentIds, helixManager, taskResultStore, FilterTask.FILTERED_IMPRESSIONS, FilterTask.FILTERED_CLICKS);
+		}
+		else if(id.equalsIgnoreCase("impCountsByGender")) {
+			return new CountTask(id, parentIds, helixManager, taskResultStore, FilterTask.FILTERED_IMPRESSIONS, "gender");
+		}
+		else if(id.equalsIgnoreCase("impCountsByCountry")) {
+			return new CountTask(id, parentIds, helixManager, taskResultStore, FilterTask.FILTERED_IMPRESSIONS, "country");
+		}
+		else if(id.equalsIgnoreCase("clickCountsByGender")) {
+			return new CountTask(id, parentIds, helixManager, taskResultStore, JoinTask.JOINED_CLICKS, "gender");
+		}
+		else if(id.equalsIgnoreCase("clickCountsByCountry")) {
+			return new CountTask(id, parentIds, helixManager, taskResultStore, JoinTask.JOINED_CLICKS, "country");
+		}
+		else if(id.equalsIgnoreCase("report")) {
+			return new ReportTask(id, parentIds, helixManager, taskResultStore);
+		}
+		
+		throw new IllegalArgumentException("Cannot create task for " + id);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/CountTask.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/CountTask.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/CountTask.java
new file mode 100644
index 0000000..ac6aff9
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/CountTask.java
@@ -0,0 +1,76 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+
+public class CountTask extends Task {
+
+	private final String _groupByCol;
+	private final String _eventSource;
+
+	public CountTask(String id, Set<String> parentIds, HelixManager helixManager,
+			TaskResultStore resultStore, String eventSource, String groupByCol) {
+		super(id, parentIds, helixManager, resultStore);
+		_eventSource = eventSource;
+		_groupByCol = groupByCol;
+	}
+
+	@Override
+	protected void executeImpl(String resourceName, int numPartitions, int partitionNum) throws Exception {
+		System.out.println("Running AggTask for " + resourceName + "_" + partitionNum + " for " + _eventSource + " " + _groupByCol);
+		if(!(_eventSource.equals(FilterTask.FILTERED_IMPRESSIONS) ||_eventSource.equals(JoinTask.JOINED_CLICKS))) {
+			throw new RuntimeException("Unsupported event source:" + _eventSource);
+		}
+		
+		long len = resultStore.llen(_eventSource);
+		long bucketSize = len/numPartitions;
+		long start = partitionNum * bucketSize;
+		long end = start + bucketSize -1;
+		List<String> events = resultStore.lrange(_eventSource, start, end);
+		Map<String, Integer> counts = new HashMap<String, Integer>();
+		for(String event : events) {
+			String[] fields = event.split(",");
+			if(_groupByCol.equals("gender")) {
+				String gender = (_eventSource.equals(FilterTask.FILTERED_IMPRESSIONS) ? fields[3] : fields[4]);
+				incrementGroupCount(gender, counts);
+			}
+			else if(_groupByCol.equals("country")) {
+				String country = (_eventSource.equals(FilterTask.FILTERED_IMPRESSIONS) ? fields[2] : fields[3]);
+				incrementGroupCount(country, counts);
+			}
+		}
+		
+		for(String key : counts.keySet()) {
+			resultStore.hincrBy(_eventSource + "_" + _groupByCol + "_counts", key, counts.get(key));
+		}
+	}
+	
+	private void incrementGroupCount(String group, Map<String, Integer> counts) {
+		int count = (counts.containsKey(group) ? counts.get(group) : 0);
+		counts.put(group, count+1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Dag.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Dag.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Dag.java
new file mode 100644
index 0000000..2321bc2
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Dag.java
@@ -0,0 +1,125 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class Dag {
+	private Map<String, Node> nodes = new HashMap<String, Dag.Node>();
+
+	public static class Node {
+		private String id;
+		private int numPartitions;
+		private Set<String> parentIds;
+
+		public Node(String id, int numPartitions, Set<String> parentIds) {
+			this.setId(id);
+			this.setNumPartitions(numPartitions);
+			this.setParentIds(parentIds);
+		}
+		
+		public Node(String id, int numPartitions, String parentIdsStr) {
+			this.setId(id);
+			this.setNumPartitions(numPartitions);
+			if(parentIdsStr != null && !parentIdsStr.trim().isEmpty()) {
+				String tmp[] = parentIdsStr.split(",");
+				parentIds = new HashSet<String>();
+				parentIds.addAll(Arrays.asList(tmp));
+			}
+			this.setParentIds(parentIds);
+		}
+		
+		public Node() {
+			setId("");
+			setNumPartitions(0);
+			setParentIds(new HashSet<String>());
+		}
+
+		public String getId() {
+			return id;
+		}
+
+		public int getNumPartitions() {
+			return numPartitions;
+		}
+
+		public Set<String> getParentIds() {
+			return parentIds;
+		}
+		
+		public static Node fromJson(String json) throws Exception {
+			ObjectMapper mapper = new ObjectMapper();
+			return mapper.readValue(json, Node.class);
+		}
+
+		public String toJson() throws Exception {
+			ObjectMapper mapper = new ObjectMapper();
+			return mapper.defaultPrettyPrintingWriter().writeValueAsString(this);
+		}
+
+		public void setId(String id) {
+			this.id = id;
+		}
+
+		public void setNumPartitions(int numPartitions) {
+			this.numPartitions = numPartitions;
+		}
+
+		public void setParentIds(Set<String> parentIds) {
+			this.parentIds = parentIds;
+		}
+	}
+
+	public void addNode(Node node) {
+		getNodes().put(node.getId(), node);
+	}
+	
+	public Node getNode(String id) {
+		return getNodes().get(id);
+	}
+	
+	public Set<String> getNodeIds() {
+		return getNodes().keySet();
+	}
+	
+	public static Dag fromJson(String json) throws Exception {
+		ObjectMapper mapper = new ObjectMapper();
+		return mapper.readValue(json, Dag.class);
+	}
+
+	public String toJson() throws Exception {
+		ObjectMapper mapper = new ObjectMapper();
+		return mapper.defaultPrettyPrintingWriter().writeValueAsString(this);
+	}
+
+	public Map<String, Node> getNodes() {
+		return nodes;
+	}
+
+	public void setNodes(Map<String, Node> nodes) {
+		this.nodes = nodes;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/FilterTask.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/FilterTask.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/FilterTask.java
new file mode 100644
index 0000000..de56c8a
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/FilterTask.java
@@ -0,0 +1,61 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+
+public class FilterTask extends Task {
+	public static final String IMPRESSIONS = "impressions_demo";
+	public static final String FILTERED_IMPRESSIONS = "filtered_impressions_demo";
+	public static final String CLICKS = "clicks_demo";
+	public static final String FILTERED_CLICKS = "filtered_clicks_demo";
+
+	private final String _dataSource;
+
+	public FilterTask(String id, Set<String> parentIds,
+			HelixManager helixManager, TaskResultStore resultStore, String dataSource) {
+		super(id, parentIds, helixManager, resultStore);
+		_dataSource = dataSource;
+	}
+
+	@Override
+	protected void executeImpl(String resourceName, int numPartitions, int partitionNum) throws Exception {
+		System.out.println("Executing filter task for " + resourceName + "_" + partitionNum + " for " + _dataSource);
+		long len = resultStore.llen(_dataSource);
+		long bucketSize = len/numPartitions;
+		long start = partitionNum * bucketSize;
+		long end = start + bucketSize - 1;
+		List<String> events = resultStore.lrange(_dataSource, start, end);
+		String outputList = (_dataSource.equals(IMPRESSIONS) ? FILTERED_IMPRESSIONS : FILTERED_CLICKS);
+		for(String event : events) {
+			if(!isFraudulent(event)) {
+				resultStore.rpush(outputList, event);
+			}
+		}
+	}
+	
+	private boolean isFraudulent(String event) {
+		String[] fields = event.split(",");
+		return fields[1].equals("true");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/JoinTask.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/JoinTask.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/JoinTask.java
new file mode 100644
index 0000000..207d74c
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/JoinTask.java
@@ -0,0 +1,74 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+
+public class JoinTask extends Task {
+	public static final String JOINED_CLICKS = "joined_clicks_demo";
+
+	private final String _impressionList;
+	private final String _clickList;
+
+	public JoinTask(String id, Set<String> parentIds,
+			HelixManager helixManager, TaskResultStore resultStore, String impressionList, String clickList) {
+		super(id, parentIds, helixManager, resultStore);
+		_impressionList = impressionList;
+		_clickList = clickList;
+	}
+
+	@Override
+	protected void executeImpl(String resourceName, int numPartitions, int partitionNum) throws Exception {
+		System.out.println("Executing JoinTask for " + resourceName + "_" + partitionNum);
+		long numClicks = resultStore.llen(_clickList);
+		List<String> clickEvents = resultStore.lrange(_clickList, 0, numClicks-1);
+		Map<String, String[]> clickIndex = getClickIndex(clickEvents);
+		
+		long len = resultStore.llen(_impressionList);
+		long bucketSize = len/numPartitions;
+		long start = partitionNum * bucketSize;
+		long end = start + bucketSize -1;
+		List<String> impressions = resultStore.lrange(_impressionList, start, end);
+		for(String impression : impressions) {
+			String[] fields = impression.split(",");
+			if(clickIndex.containsKey(fields[0])) {
+				String clickId = clickIndex.get(fields[0])[0];
+				String joinedClick = clickId + "," + impression;
+				resultStore.rpush(JOINED_CLICKS, joinedClick);
+			}
+		}
+	}
+
+	//return map of impression id to click (fields of the click event)
+	private Map<String, String[]> getClickIndex(List<String> clickEvents) {
+		Map<String, String[]> clickIndex = new HashMap<String, String[]>();
+		for(String click : clickEvents) {
+			String fields[] = click.split(",");
+			clickIndex.put(fields[2], fields);
+		}
+		return clickIndex;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/RedisTaskResultStore.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/RedisTaskResultStore.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/RedisTaskResultStore.java
new file mode 100644
index 0000000..e3d3b8c
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/RedisTaskResultStore.java
@@ -0,0 +1,107 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+
+public class RedisTaskResultStore implements TaskResultStore {
+	private JedisPool _jedisPool;
+
+	public RedisTaskResultStore(String redisServer, int redisPort, int timeout) {
+		_jedisPool = new JedisPool(new JedisPoolConfig(), redisServer,
+				redisPort, timeout);
+	}
+
+	@Override
+	public boolean exists(String key) throws Exception {
+		Jedis jedis = _jedisPool.getResource();
+		try {
+			return jedis.exists(key);
+		} finally {
+			_jedisPool.returnResource(jedis);
+		}
+	}
+
+	@Override
+	public void rpush(String key, String value) throws Exception {
+		Jedis jedis = _jedisPool.getResource();
+		try {
+			jedis.rpush(key, value);
+		} finally {
+			_jedisPool.returnResource(jedis);
+		}
+	}
+
+	@Override
+	public List<String> lrange(String key, long start, long end)
+			throws Exception {
+		Jedis jedis = _jedisPool.getResource();
+		try {
+			return jedis.lrange(key, start, end);
+		} finally {
+			_jedisPool.returnResource(jedis);
+		}
+	}
+
+	@Override
+	public void ltrim(String key, long start, long end) throws Exception {
+		Jedis jedis = _jedisPool.getResource();
+		try {
+			jedis.ltrim(key, start, end);
+		} finally {
+			_jedisPool.returnResource(jedis);
+		}
+	}
+
+	@Override
+	public long llen(String key) throws Exception {
+		Jedis jedis = _jedisPool.getResource();
+		try {
+			return jedis.llen(key);
+		} finally {
+			_jedisPool.returnResource(jedis);
+		}
+	}
+
+	@Override
+	public Long hincrBy(String key, String field, long value) throws Exception {
+		Jedis jedis = _jedisPool.getResource();
+		try {
+			return jedis.hincrBy(key, field, value);
+		} finally {
+			_jedisPool.returnResource(jedis);
+		}
+	}
+
+	@Override
+	public Map<String, String> hgetAll(String key) {
+		Jedis jedis = _jedisPool.getResource();
+		try {
+			return jedis.hgetAll(key);
+		} finally {
+			_jedisPool.returnResource(jedis);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/ReportTask.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/ReportTask.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/ReportTask.java
new file mode 100644
index 0000000..3886a1f
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/ReportTask.java
@@ -0,0 +1,57 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+
+public class ReportTask extends Task {
+
+	public ReportTask(String id, Set<String> parentIds,
+			HelixManager helixManager, TaskResultStore resultStore) {
+		super(id, parentIds, helixManager, resultStore);
+	}
+
+	@Override
+	protected void executeImpl(String resourceName, int numPartitions,
+			int partitionNum) throws Exception {
+		System.out.println("Running reports task");
+		
+		System.out.println("Impression counts per country");
+		printCounts(FilterTask.FILTERED_IMPRESSIONS + "_country_counts");
+		
+		System.out.println("Click counts per country");
+		printCounts(JoinTask.JOINED_CLICKS + "_country_counts");
+		
+		System.out.println("Impression counts per gender");
+		printCounts(FilterTask.FILTERED_IMPRESSIONS + "_gender_counts");
+
+		System.out.println("Click counts per gender");
+		printCounts(JoinTask.JOINED_CLICKS + "_gender_counts");
+	}
+
+	private void printCounts(String tableName) {
+		Map<String, String> counts = resultStore.hgetAll(tableName);
+		System.out.println(counts);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Task.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Task.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Task.java
new file mode 100644
index 0000000..12af8a6
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Task.java
@@ -0,0 +1,128 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+
+public abstract class Task implements ExternalViewChangeListener {
+	CountDownLatch parentDependencyLatch;
+	Set<String> completedParentTasks;
+	
+	private static final int TIMEOUT_SECONDS = 5;
+	protected final Set<String> parentIds;
+	protected final String id;
+	private final HelixManager helixManager;
+	protected final TaskResultStore resultStore;
+
+	public Task(String id, Set<String> parentIds, HelixManager helixManager, TaskResultStore resultStore) {
+		this.id = id;
+		this.parentIds = parentIds;
+		this.helixManager = helixManager;
+		this.resultStore = resultStore;
+		parentDependencyLatch = new CountDownLatch(1);
+		completedParentTasks = new HashSet<String>();
+	}
+
+	@Override
+	public void onExternalViewChange(List<ExternalView> externalViewList,
+			NotificationContext changeContext) {
+
+		if(areParentTasksDone(externalViewList)) {
+			parentDependencyLatch.countDown();
+		}
+	}
+
+	private boolean areParentTasksDone(List<ExternalView> externalViewList) {
+		if(parentIds == null || parentIds.size() == 0) {
+			return true;
+		}
+		
+		for (ExternalView ev : externalViewList) {
+			String resourceName = ev.getResourceName();
+			if (parentIds.contains(resourceName)) {
+				if (isParentTaskDone(ev)) {
+					completedParentTasks.add(resourceName);
+				}
+			}
+		}
+		
+		return parentIds.equals(completedParentTasks);
+	}
+
+	private boolean isParentTaskDone(ExternalView ev) {
+		Set<String> partitionSet = ev.getPartitionSet();
+		if(partitionSet.isEmpty()) {
+			return false;
+		}
+		
+		for (String partition : partitionSet) {
+			Map<String, String> stateMap = ev.getStateMap(partition);
+			for (String instance : stateMap.keySet()) {
+				if (!stateMap.get(instance).equalsIgnoreCase("Online")) {
+					return false;
+				}
+			}
+		}
+		return true;
+	}
+	
+	private List<ExternalView> getExternalViews() {
+		String clusterName = helixManager.getClusterName();
+		List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+		HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+		List<String> resourcesInCluster = helixAdmin.getResourcesInCluster(clusterName);
+		for(String resourceName : resourcesInCluster) {
+			ExternalView ev = helixManager.getClusterManagmentTool().getResourceExternalView(clusterName, resourceName);
+			if(ev != null) {
+				externalViewList.add(ev);
+			}
+		}
+		
+		return externalViewList;
+	}
+
+	public final void execute(String resourceName, int numPartitions, int partitionNum) throws Exception {
+		if (!areParentTasksDone(getExternalViews())) {
+			if (!parentDependencyLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+				Set<String> pendingTasks = new HashSet<String>();
+				pendingTasks.addAll(parentIds);
+				pendingTasks.removeAll(completedParentTasks);
+				throw new Exception(id + " timed out while waiting for the following parent tasks to finish : " + pendingTasks);
+			}
+		}
+		
+		executeImpl(resourceName, numPartitions, partitionNum);
+	}
+
+	protected abstract void executeImpl(String resourceName, int numPartitions, int partitionNum) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskCluster.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskCluster.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskCluster.java
new file mode 100644
index 0000000..313770f
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskCluster.java
@@ -0,0 +1,86 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+public class TaskCluster {
+	public static final String DEFAULT_CLUSTER_NAME = "task-cluster";
+	static final String DEFAULT_STATE_MODEL = "OnlineOffline";
+
+	ZkClient _zkclient = null;
+	ZKHelixAdmin _admin = null;
+	private final String _clusterName;
+
+	public TaskCluster(String zkAddr, String clusterName) throws Exception {
+		_clusterName = clusterName;
+		_zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+				ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+		_admin = new ZKHelixAdmin(_zkclient);
+	}
+
+	public void setup() throws Exception {
+		// add cluster
+		_admin.addCluster(_clusterName, true);
+
+		// add state model definition
+		StateModelConfigGenerator generator = new StateModelConfigGenerator();
+		_admin.addStateModelDef(
+				_clusterName,
+				DEFAULT_STATE_MODEL,
+				new StateModelDefinition(generator
+						.generateConfigForOnlineOffline()));
+
+	}
+
+	public void disconnect() throws Exception {
+		_zkclient.close();
+	}
+	
+	public void submitDag(String dagJson) throws Exception {
+		Dag dag = Dag.fromJson(dagJson);
+		submitDag(dag);
+	}
+
+	public void submitDag(Dag dag) throws Exception {
+		ConfigAccessor clusterConfig = new ConfigAccessor(_zkclient);
+		ConfigScope clusterScope = new ConfigScopeBuilder().forCluster(_clusterName).build();
+		for (String id : dag.getNodeIds()) {
+			Dag.Node node = dag.getNode(id);
+			clusterConfig.set(clusterScope, node.getId(), node.toJson());
+			_admin.addResource(_clusterName, node.getId(),
+					node.getNumPartitions(), DEFAULT_STATE_MODEL,
+					IdealStateModeProperty.AUTO_REBALANCE.toString());
+
+		}
+
+		for (String id : dag.getNodeIds()) {
+			_admin.rebalance(_clusterName, id, 1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskExecutionDemo.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskExecutionDemo.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskExecutionDemo.java
new file mode 100644
index 0000000..4fd3f27
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskExecutionDemo.java
@@ -0,0 +1,159 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.taskexecution.Dag.Node;
+
+public class TaskExecutionDemo {
+
+	public static void main(String[] args) throws Exception {
+		if (args.length != 3) {
+			System.err
+					.println("USAGE: java TaskExecutionDemo zkPort redisHost redisPort");
+			System.exit(1);
+		}
+
+		String redisHost = args[1];
+		int redisPort = Integer.parseInt(args[2]);
+		ZkServer server = null;
+		try {
+			String baseDir = "/tmp/TaskExecutionDemo/";
+			final String dataDir = baseDir + "zk/dataDir";
+			final String logDir = baseDir + "/tmp/logDir";
+			FileUtils.deleteDirectory(new File(dataDir));
+			FileUtils.deleteDirectory(new File(logDir));
+
+			IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+				@Override
+				public void createDefaultNameSpace(ZkClient zkClient) {
+
+				}
+			};
+
+			int zkPort = Integer.parseInt(args[0]);
+			server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+			server.start();
+
+			String zkAddr = "localhost:" + zkPort;
+			String clusterName = TaskCluster.DEFAULT_CLUSTER_NAME;
+			
+			TaskCluster taskCluster = new TaskCluster(zkAddr, clusterName);
+			taskCluster.setup();
+
+			startController(zkAddr, clusterName);
+
+			TaskFactory taskFactory = new AnalyticsTaskFactory();
+			TaskResultStore taskResultStore = new RedisTaskResultStore(redisHost, redisPort, 1000);
+			
+			populateDummyData(taskResultStore);
+
+			startWorkers(zkAddr, TaskCluster.DEFAULT_CLUSTER_NAME, taskFactory, taskResultStore);
+
+			Dag dag = getAnalyticsDag();
+			taskCluster.submitDag(dag);
+		} finally {
+			if (server != null) {
+				// server.shutdown();
+			}
+		}
+	}
+
+	private static void populateDummyData(TaskResultStore taskResultStore) throws Exception {
+		float fraudProbability = 0.01f;
+		float clickProbability = 0.01f;
+		int numImps = 10000; 
+		Random rand = new Random();
+		String[] countries = {"US", "CANADA", "UK", "CHINA", "UNKNOWN"};
+		String[] genders = {"M", "F", "UNKNOWN"};
+		for(int i = 0; i < numImps; i++) {
+			boolean isFraudulent = (rand.nextFloat() <= fraudProbability);
+			String impEventId = "" + Math.abs(rand.nextLong());
+			String impEvent = impEventId; //event id
+			impEvent += "," + isFraudulent;
+			impEvent += "," + countries[rand.nextInt(countries.length)];
+			impEvent += "," + genders[rand.nextInt(genders.length)];
+			taskResultStore.rpush(FilterTask.IMPRESSIONS, impEvent);
+			
+			boolean isClick = (rand.nextFloat() <= clickProbability);
+			if(isClick) {
+				String clickEvent = "" + Math.abs(rand.nextLong()); //event id
+				isFraudulent = (rand.nextFloat() <= fraudProbability);
+				clickEvent += "," + isFraudulent;
+				clickEvent += "," + impEventId;
+				taskResultStore.rpush(FilterTask.CLICKS, clickEvent);
+			}
+		}
+		System.out.println("Done populating dummy data");
+	}
+
+	private static void startController(String zkAddr, String clusterName)
+			throws Exception {
+		final HelixManager manager = HelixControllerMain.startHelixController(
+				zkAddr, clusterName, null, HelixControllerMain.STANDALONE);
+
+		Runtime.getRuntime().addShutdownHook(new Thread() {
+			@Override
+			public void run() {
+				System.out.println("Shutting down cluster manager: "
+						+ manager.getInstanceName());
+				manager.disconnect();
+			}
+		});
+	}
+
+	private static void startWorkers(String zkAddr, String clusterName,
+			TaskFactory taskFactory, TaskResultStore taskResultStore) {
+		int numWorkers = 10;
+		Executor executor = Executors.newFixedThreadPool(numWorkers);
+
+		for (int i = 0; i < numWorkers; i++) {
+			Worker worker = new Worker(zkAddr, clusterName, "" + i, taskFactory, taskResultStore);
+			executor.execute(worker);
+		}
+	}
+
+	
+	private static Dag getAnalyticsDag() {
+		Dag dag = new Dag();
+		dag.addNode(new Node("filterImps", 5, ""));
+		dag.addNode(new Node("filterClicks", 5, ""));
+		dag.addNode(new Node("impClickJoin", 5, "filterImps,filterClicks"));
+		dag.addNode(new Node("impCountsByGender", 5, "filterImps"));
+		dag.addNode(new Node("impCountsByCountry", 5, "filterImps"));
+		dag.addNode(new Node("clickCountsByGender", 5, "impClickJoin"));
+		dag.addNode(new Node("clickCountsByCountry", 5, "impClickJoin"));
+		
+		dag.addNode(new Node("report", 1, "impCountsByGender,impCountsByCountry,clickCountsByGender,clickCountsByCountry"));
+		
+		return dag;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskFactory.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskFactory.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskFactory.java
new file mode 100644
index 0000000..7779c8b
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskFactory.java
@@ -0,0 +1,28 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+
+public interface TaskFactory {
+	public Task createTask(String id, Set<String> parentIds, HelixManager helixManager, TaskResultStore resultStore);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskResultStore.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskResultStore.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskResultStore.java
new file mode 100644
index 0000000..432a883
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskResultStore.java
@@ -0,0 +1,39 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+public interface TaskResultStore {
+	public boolean exists(String key) throws Exception;
+	
+	public long llen(String key) throws Exception;
+	
+	public void rpush(String key, String value) throws Exception;
+	
+	public List<String> lrange(final String key, final long start, final long end) throws Exception;
+    
+	public void ltrim(final String key, final long start, final long end) throws Exception;
+    
+    public Long hincrBy(final String key, final String field, final long value) throws Exception;
+  
+    public Map<String, String> hgetAll(final String key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java
new file mode 100644
index 0000000..049b911
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java
@@ -0,0 +1,101 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "ONLINE", "ERROR" })
+public class TaskStateModel extends StateModel {
+	private static Logger LOG = Logger.getLogger(TaskStateModel.class);
+
+	private final String _workerId;
+	private final String _partition;
+
+	private final TaskFactory _taskFactory;
+
+	private TaskResultStore _taskResultStore;
+
+	public TaskStateModel(String workerId, String partition, TaskFactory taskFactory, TaskResultStore taskResultStore) {
+		_partition = partition;
+		_workerId = workerId;
+		_taskFactory = taskFactory;
+		_taskResultStore = taskResultStore;
+	}
+
+	@Transition(to = "ONLINE", from = "OFFLINE")
+	public void onBecomeOnlineFromOffline(Message message,
+			NotificationContext context) throws Exception {
+		LOG.debug(_workerId + " becomes ONLINE from OFFLINE for "
+				+ _partition);
+		ConfigAccessor clusterConfig = context.getManager().getConfigAccessor();
+		HelixManager manager = context.getManager();
+		ConfigScope clusterScope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
+		String json = clusterConfig.get(clusterScope, message.getResourceName());
+		Dag.Node node = Dag.Node.fromJson(json); 
+		Set<String> parentIds = node.getParentIds();
+		String resourceName = message.getResourceName();
+		int numPartitions = node.getNumPartitions();
+		Task task = _taskFactory.createTask(resourceName, parentIds, manager, _taskResultStore);
+		manager.addExternalViewChangeListener(task);
+		
+		LOG.debug("Starting task for " + _partition + "...");
+		int partitionNum = Integer.parseInt(_partition.split("_")[1]);
+		task.execute(resourceName, numPartitions, partitionNum);
+		LOG.debug("Task for " + _partition + " done");
+	}
+
+	@Transition(to = "OFFLINE", from = "ONLINE")
+	public void onBecomeOfflineFromOnline(Message message,
+			NotificationContext context) throws InterruptedException {
+		LOG.debug(_workerId + " becomes OFFLINE from ONLINE for "
+				+ _partition);
+		
+	}
+
+	@Transition(to = "DROPPED", from = "OFFLINE")
+	public void onBecomeDroppedFromOffline(Message message,
+			NotificationContext context) {
+		LOG.debug(_workerId + " becomes DROPPED from OFFLINE for "
+				+ _partition);
+	}
+
+	@Transition(to = "OFFLINE", from = "ERROR")
+	public void onBecomeOfflineFromError(Message message,
+			NotificationContext context) {
+		LOG.debug(_workerId + " becomes OFFLINE from ERROR for " + _partition);
+	}
+
+	@Override
+	public void reset() {
+		LOG.warn("Default reset() invoked");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
new file mode 100644
index 0000000..c1c9a84
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
@@ -0,0 +1,40 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
+	private final String _workerId;
+	private final TaskFactory _taskFactory;
+	private TaskResultStore _taskResultStore;
+
+	public TaskStateModelFactory(String workerId, TaskFactory taskFactory, TaskResultStore taskResultStore) {
+		_workerId = workerId;
+		_taskFactory = taskFactory;
+		_taskResultStore = taskResultStore;
+	}
+
+	@Override
+	public TaskStateModel createNewStateModel(String partition) {
+		TaskStateModel model = new TaskStateModel(_workerId, partition, _taskFactory, _taskResultStore);
+		return model;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ec8000e1/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
new file mode 100644
index 0000000..64a59e2
--- /dev/null
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
@@ -0,0 +1,131 @@
+package org.apache.helix.taskexecution;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+
+public class Worker implements Runnable {
+	private final String _zkAddr;
+	private final String _clusterName;
+	private final String _instanceName;
+	private HelixManager _manager = null;
+	private final TaskFactory _taskFactory;
+	private final TaskResultStore _taskResultStore;
+
+	public Worker(String zkAddr, String clusterName, String workerId, TaskFactory taskFactory, TaskResultStore taskResultStore) {
+		_zkAddr = zkAddr;
+		_clusterName = clusterName;
+		_taskResultStore = taskResultStore;
+		_instanceName = "worker_" + workerId;
+		_taskFactory = taskFactory;
+	}
+
+	public void connect() {
+		try {
+			_manager = HelixManagerFactory.getZKHelixManager(_clusterName,
+					_instanceName, InstanceType.PARTICIPANT, _zkAddr);
+
+			StateMachineEngine stateMach = _manager.getStateMachineEngine();
+			TaskStateModelFactory modelFactory = new TaskStateModelFactory(_instanceName, _taskFactory, _taskResultStore);
+			stateMach.registerStateModelFactory(TaskCluster.DEFAULT_STATE_MODEL, modelFactory);
+
+			_manager.connect();
+
+			Thread.currentThread().join();
+		} catch (InterruptedException e) {
+			System.err.println(" [-] " + _instanceName + " is interrupted ...");
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} finally {
+			disconnect();
+		}
+	}
+
+	public void disconnect() {
+		if (_manager != null) {
+			_manager.disconnect();
+		}
+	}
+	
+	@Override
+	public void run() {
+		ZkClient zkclient = null;
+		try {
+			// add node to cluster if not already added
+			zkclient = new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+					ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+					new ZNRecordSerializer());
+			ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+			List<String> nodes = admin.getInstancesInCluster(_clusterName);
+			if (!nodes.contains(_instanceName)) {
+				InstanceConfig config = new InstanceConfig(_instanceName);
+				config.setHostName("localhost");
+				config.setInstanceEnabled(true);
+				admin.addInstance(_clusterName, config);
+			}
+
+
+			Runtime.getRuntime().addShutdownHook(new Thread() {
+				@Override
+				public void run() {
+					System.out.println("Shutting down " + _instanceName);
+					disconnect();
+				}
+			});
+
+			connect();
+		} finally {
+			if (zkclient != null) {
+				zkclient.close();
+			}
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		if (args.length != 4) {
+			System.err.println("USAGE: java Worker zookeeperAddress redisServerHost redisServerPort workerId");
+			System.exit(1);
+		}
+
+		final String zkAddr = args[0];
+		final String clusterName = TaskCluster.DEFAULT_CLUSTER_NAME;
+		final String redisServerHost = args[1];
+		final int redisServerPort = Integer.parseInt(args[2]);
+		final String workerId = args[3];
+		
+		TaskFactory taskFactory = new AnalyticsTaskFactory();
+		TaskResultStore taskResultStore = new RedisTaskResultStore(redisServerHost, redisServerPort, 1000);
+		Worker worker = new Worker(zkAddr, clusterName, workerId, taskFactory, taskResultStore);
+		worker.run();
+	}
+
+
+}


Mime
View raw message