incubator-s4-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shailendra Mishra <shailend...@gmail.com>
Subject Re: [19/22] git commit: utility scripts + refactored twitter app - scripts for defining cluster configurations, starting nodes and uploading apps - removed custom event class for twitter app
Date Fri, 15 Jun 2012 17:35:41 GMT
Which branch do I refresh from to get these fixes ? Should I continue
with git checkout S4-22 or go directly to the piper branch. -
Shailendra

On Fri, Jun 15, 2012 at 9:06 AM,  <mmorel@apache.org> wrote:
> utility scripts + refactored twitter app
> - scripts for defining cluster configurations, starting nodes and
> uploading apps
> - removed custom event class for twitter app
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/91a7fff8
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/91a7fff8
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/91a7fff8
>
> Branch: refs/heads/piper
> Commit: 91a7fff86c1727ae6852d9dd0683d1078a77c86d
> Parents: d11f7fb
> Author: Matthieu Morel <mmorel@apache.org>
> Authored: Mon Mar 26 15:46:20 2012 +0200
> Committer: Matthieu Morel <mmorel@apache.org>
> Committed: Thu Mar 29 13:45:33 2012 +0200
>
> ----------------------------------------------------------------------
>  build.gradle                                       |    3 +-
>  s4                                                 |   44 ++++
>  settings.gradle                                    |    5 +
>  .../java/org/apache/s4/fixtures/CommTestUtils.java |    2 +-
>  .../src/main/java/org/apache/s4/core/App.java      |    2 +
>  .../src/main/java/org/apache/s4/core/Main.java     |   23 ++-
>  .../org/apache/s4/core/adapter/AdapterMain.java    |    8 +-
>  .../java/org/apache/s4/deploy/util/DeployApp.java  |  151 --------------
>  .../main/java/org/apache/s4/fluent/FluentApp.java  |   11 -
>  .../test/java/org/apache/s4/fixtures/ZKServer.java |   67 ------
>  .../org/apache/s4/wordcount/WordClassifierPE.java  |   19 +-
>  .../src/test/resources/default.s4.properties       |    9 +-
>  .../resources/org.apache.s4.deploy.s4.properties   |    4 +-
>  subprojects/s4-tools/s4-tools.gradle               |   48 +++++
>  .../java/org/apache/s4/tools/DefineCluster.java    |   65 ++++++
>  .../src/main/java/org/apache/s4/tools/Deploy.java  |  162 +++++++++++++++
>  .../src/main/java/org/apache/s4/tools/Tools.java   |   22 ++
>  .../main/java/org/apache/s4/tools/ZKServer.java    |   75 +++++++
>  .../s4-counter/src/main/java/s4app/ClockApp.java   |   25 +--
>  .../src/main/java/s4app/ShowTimeApp.java           |   26 +--
>  .../main/java/org/apache/s4/deploy/SimplePE.java   |    2 +-
>  .../main/java/org/apache/s4/deploy/TestApp.java    |    2 +-
>  .../main/java/org/apache/s4/deploy/TestApp.java    |    2 +-
>  test-apps/twitter-adapter/README.txt               |    1 +
>  test-apps/twitter-adapter/build.gradle             |    3 +
>  .../s4/example/twitter/TwitterInputAdapter.java    |   19 ++-
>  .../src/main/resources/default.s4.properties       |   18 --
>  .../src/main/resources/s4.properties               |   18 ++
>  .../src/main/resources/twitter4j.properties        |    5 -
>  test-apps/twitter-counter/README.txt               |   33 +++
>  test-apps/twitter-counter/build.gradle             |    1 +
>  .../org/apache/s4/example/twitter/TopNTopicPE.java |   16 +-
>  .../s4/example/twitter/TopicCountAndReportPE.java  |   26 ++-
>  .../s4/example/twitter/TopicExtractorPE.java       |   13 +-
>  .../apache/s4/example/twitter/TopicSeenEvent.java  |   17 --
>  .../s4/example/twitter/TwitterCounterApp.java      |   10 +-
>  .../src/main/resources/default.s4.properties       |   15 ++
>  37 files changed, 602 insertions(+), 370 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/build.gradle
> ----------------------------------------------------------------------
> diff --git a/build.gradle b/build.gradle
> index 7db43d7..fd75057 100644
> --- a/build.gradle
> +++ b/build.gradle
> @@ -72,7 +72,8 @@ libraries = [
>     junit:              'junit:junit:4.10',
>     zkclient:           'com.github.sgroschupf:zkclient:0.1',
>     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
> -    jcommander:         'com.beust:jcommander:1.23'
> +    jcommander:         'com.beust:jcommander:1.23',
> +    commons_io:         'commons-io:commons-io:2.1'
>  ]
>
>  subprojects {
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/s4
> ----------------------------------------------------------------------
> diff --git a/s4 b/s4
> new file mode 100755
> index 0000000..a958cc2
> --- /dev/null
> +++ b/s4
> @@ -0,0 +1,44 @@
> +#!/bin/bash
> +
> +# NOTE: "./gradlew s4-tools:installApp" will prepare/update the tools subproject and related startup scripts
> +
> +echo $0
> +echo $1
> +
> +GRADLE=`pwd`/gradlew
> +
> +case "$1" in
> +"deploy")
> +# examples:
> +# ./s4 deploy -appName=twitter-counter -buildFile=<s4-dir>/test-apps/twitter-counter/build.gradle -cluster=s4-test-cluster
> +# ./s4 deploy -appName=twitter-adapter -buildFile=<s4-dir>/test-apps/twitter-adapter/build.gradle -cluster=s4-adapter-cluster
> +       shift
> +    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.Deploy -gradle=$GRADLE $@
> +;;
> +"zkServer")
> +       shift
> +    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.ZKServer $@
> +;;
> +"newCluster")
> +# examples:
> +#./s4 newCluster -name=s4-test-cluster -firstListeningPort=11000 -nbTasks=2 ; ./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=13000 -nbTasks=1
> +       shift
> +    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.DefineCluster $@
> +;;
> +"appNode")
> +# example:
> +# ./s4 appNode <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
> +       shift
> +       subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.Main $@
> +;;
> +"adapterNode")
> +# example:
> +# ./s4 adapterNode -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
> +       shift
> +       subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.adapter.AdapterMain $@
> +;;
> +
> +
> +
> +esac
> +
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/settings.gradle
> ----------------------------------------------------------------------
> diff --git a/settings.gradle b/settings.gradle
> index a5655c4..33827ba 100644
> --- a/settings.gradle
> +++ b/settings.gradle
> @@ -17,8 +17,13 @@ include 's4-base'
>  include 's4-core'
>  include 's4-comm'
>  include 's4-example'
> +include 's4-tools'
> +//include 's4-example'
> +//include ':test-apps:simple-adapter-1'
>  include ':test-apps:simple-deployable-app-1'
>  include ':test-apps:simple-deployable-app-2'
> +include ':test-apps:s4-showtime'
> +include ':test-apps:s4-counter'
>
>  rootProject.name = 's4'
>  rootProject.children.each {project ->
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> index e0a9456..7d7913d 100644
> --- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> +++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> @@ -38,7 +38,7 @@ public class CommTestUtils {
>
>     private static final Logger logger = LoggerFactory.getLogger(CommTestUtils.class);
>
> -    public static final int ZK_PORT = 21810;
> +    public static final int ZK_PORT = 2181;
>     public static final int INITIAL_BOOKIE_PORT = 5000;
>     public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
>     public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> index 2575013..dd46439 100644
> --- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> +++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> @@ -18,6 +18,7 @@ package org.apache.s4.core;
>  import java.util.ArrayList;
>  import java.util.Arrays;
>  import java.util.List;
> +import java.util.Map;
>  import java.util.concurrent.TimeUnit;
>
>  import org.apache.s4.base.Event;
> @@ -25,6 +26,7 @@ import org.apache.s4.base.KeyFinder;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
>
> +import com.google.common.collect.Maps;
>  import com.google.inject.AbstractModule;
>  import com.google.inject.Guice;
>  import com.google.inject.Inject;
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> index 71756a4..c18603c 100644
> --- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> +++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> @@ -29,16 +29,19 @@ public class Main {
>      * @param args
>      */
>     public static void main(String[] args) {
> -
> -        if (args.length == 0) {
> -            logger.info("Starting S4 node with default configuration");
> -            startDefaultS4Node();
> -        } else if (args.length == 1) {
> -            logger.info("Starting S4 node with custom configuration from file {}", args[0]);
> -            startCustomS4Node(args[0]);
> -        } else {
> -            logger.info("Starting S4 node in development mode");
> -            startDevelopmentMode(args);
> +        try {
> +            if (args.length == 0) {
> +                logger.info("Starting S4 node with default configuration");
> +                startDefaultS4Node();
> +            } else if (args.length == 1) {
> +                logger.info("Starting S4 node with custom configuration from file {}", args[0]);
> +                startCustomS4Node(args[0]);
> +            } else {
> +                logger.info("Starting S4 node in development mode");
> +                startDevelopmentMode(args);
> +            }
> +        } catch (Exception e) {
> +            logger.error("Cannot start S4 node", e);
>         }
>     }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> index a1c496c..f027f06 100644
> --- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> +++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> @@ -46,13 +46,7 @@ public class AdapterMain {
>     @Parameters(separators = "=")
>     static class AdapterArgs {
>
> -        @Parameter(names = "-moduleClass", description = "module class name")
> -        String moduleClass;
> -
> -        @Parameter(names = "-adapterClass", description = "adapter class name")
> -        String adapterClass;
> -
> -        @Parameter(names = "-s4Properties", description = "s4 properties file path")
> +        @Parameter(names = "-s4Properties", description = "s4 properties file path", required = true)
>         String s4PropertiesFilePath;
>     }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
> deleted file mode 100644
> index d8b77a4..0000000
> --- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
> +++ /dev/null
> @@ -1,151 +0,0 @@
> -package org.apache.s4.deploy.util;
> -
> -import java.io.BufferedReader;
> -import java.io.File;
> -import java.io.IOException;
> -import java.io.InputStreamReader;
> -import java.util.ArrayList;
> -import java.util.Arrays;
> -import java.util.List;
> -
> -import junit.framework.Assert;
> -
> -import org.I0Itec.zkclient.ZkClient;
> -import org.apache.log4j.BasicConfigurator;
> -import org.apache.log4j.Level;
> -import org.apache.log4j.Logger;
> -import org.apache.s4.comm.topology.ZNRecord;
> -import org.apache.s4.comm.topology.ZNRecordSerializer;
> -import org.apache.s4.deploy.DistributedDeploymentManager;
> -import org.apache.zookeeper.CreateMode;
> -import org.slf4j.LoggerFactory;
> -
> -import com.beust.jcommander.JCommander;
> -import com.beust.jcommander.Parameter;
> -import com.beust.jcommander.Parameters;
> -import com.google.common.io.ByteStreams;
> -import com.google.common.io.Files;
> -
> -public class DeployApp {
> -
> -    private static File tmpAppsDir;
> -
> -    /**
> -     * @param args
> -     */
> -    public static void main(String[] args) {
> -
> -        DeployAppArgs appArgs = new DeployAppArgs();
> -        JCommander jc = new JCommander(appArgs);
> -        // configure log4j for Zookeeper
> -        BasicConfigurator.configure();
> -        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
> -        Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
> -
> -        try {
> -            jc.parse(args);
> -        } catch (Exception e) {
> -            jc.usage();
> -            System.exit(-1);
> -        }
> -        try {
> -            ZkClient zkClient = new ZkClient(appArgs.zkConnectionString);
> -            zkClient.setZkSerializer(new ZNRecordSerializer());
> -
> -            tmpAppsDir = Files.createTempDir();
> -
> -            // File gradlewFile = CoreTestUtils.findGradlewInRootDir();
> -
> -            // CoreTestUtils.callGradleTask(gradlewFile, new File(appArgs.gradleBuildFilePath), "installS4R",
> -            // new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
> -            ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
> -                    new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
> -
> -            File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
> -
> -            Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/"
> -                    + appArgs.appName + ".s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
> -
> -            final String uri = s4rToDeploy.toURI().toString();
> -            ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
> -            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
> -            zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
> -
> -        } catch (Exception e) {
> -            LoggerFactory.getLogger(DeployApp.class).error("Cannot deploy app", e);
> -        }
> -
> -    }
> -
> -    @Parameters(separators = "=")
> -    static class DeployAppArgs {
> -
> -        @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
> -        String gradleExecPath;
> -
> -        @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
> -        String gradleBuildFilePath;
> -
> -        @Parameter(names = "-appName", description = "name of S4 application", required = true)
> -        String appName;
> -
> -        @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
> -        String clusterName;
> -
> -        @Parameter(names = "-zk", description = "zookeeper connection string")
> -        String zkConnectionString = "localhost:2181";
> -
> -    }
> -
> -    static class ExecGradle {
> -
> -        public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
> -                throws Exception {
> -            List<String> cmdList = new ArrayList<String>();
> -            cmdList.add(gradlewExecPath);
> -            // cmdList.add("-c");
> -            // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
> -            cmdList.add("-b");
> -            cmdList.add(buildFilePath);
> -            cmdList.add(taskName);
> -            if (params.length > 0) {
> -                for (int i = 0; i < params.length; i++) {
> -                    cmdList.add("-P" + params[i]);
> -                }
> -            }
> -
> -            System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
> -            ProcessBuilder pb = new ProcessBuilder(cmdList);
> -
> -            pb.directory(new File(buildFilePath).getParentFile());
> -            pb.redirectErrorStream();
> -
> -            final Process process = pb.start();
> -            new Thread(new Runnable() {
> -                @Override
> -                public void run() {
> -                    BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
> -                    String line;
> -                    try {
> -                        line = br.readLine();
> -                        while (line != null) {
> -                            System.out.println(line);
> -                            line = br.readLine();
> -                        }
> -                    } catch (IOException e) {
> -                        throw new RuntimeException(e);
> -                    }
> -                }
> -            }).start();
> -            process.waitFor();
> -
> -            // try {
> -            // int exitValue = process.exitValue();
> -            // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
> -            // } catch (IllegalThreadStateException ignored) {
> -            // }
> -
> -        }
> -    }
> -
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> index 9f66e0d..670c375 100644
> --- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> +++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> @@ -35,15 +35,4 @@ public class FluentApp extends App {
>         appMaker.close();
>     }
>
> -    public void start() {
> -        super.start();
> -    }
> -
> -    public void init() {
> -        super.init();
> -    }
> -
> -    public void close() {
> -        super.close();
> -    }
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
> deleted file mode 100644
> index b79f1a0..0000000
> --- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
> +++ /dev/null
> @@ -1,67 +0,0 @@
> -package org.apache.s4.fixtures;
> -
> -import java.util.Arrays;
> -
> -import org.apache.s4.comm.tools.TaskSetup;
> -import org.slf4j.Logger;
> -import org.slf4j.LoggerFactory;
> -
> -import com.beust.jcommander.JCommander;
> -import com.beust.jcommander.Parameter;
> -import com.beust.jcommander.Parameters;
> -
> -public class ZKServer {
> -
> -    private static Logger logger = LoggerFactory.getLogger(ZKServer.class);
> -
> -    /**
> -     * @param args
> -     */
> -    public static void main(String[] args) {
> -        ZKServerArgs clusterArgs = new ZKServerArgs();
> -        JCommander jc = new JCommander(clusterArgs);
> -        try {
> -            jc.parse(args);
> -        } catch (Exception e) {
> -            System.out.println(Arrays.toString(args));
> -            e.printStackTrace();
> -            jc.usage();
> -            System.exit(-1);
> -        }
> -        try {
> -
> -            logger.info("Starting zookeeper server for cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
> -                    clusterArgs.nbTasks);
> -            if (clusterArgs.startZK) {
> -                CommTestUtils.startZookeeperServer();
> -            }
> -            TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
> -            taskSetup.clean(clusterArgs.clusterName);
> -            taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
> -            logger.info("Zookeeper started");
> -        } catch (Exception e) {
> -            logger.error("Cannot initialize zookeeper with specified configuration", e);
> -        }
> -
> -    }
> -
> -    @Parameters(separators = "=", commandDescription = "Start Zookeeper server and initialize S4 cluster configuration in Zookeeper (and clean previous one with same cluster name)")
> -    static class ZKServerArgs {
> -
> -        @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
> -        String clusterName = "s4-test-cluster";
> -
> -        @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
> -        int nbTasks = 1;
> -
> -        @Parameter(names = "-zk", description = "Zookeeper connection string")
> -        String zkConnectionString = "localhost:21810";
> -
> -        @Parameter(names = "-startZK", description = "Start local zookeeper server (connection string ignored in that case)", required = false)
> -        boolean startZK = false;
> -
> -        @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
> -        int firstListeningPort = -1;
> -    }
> -
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> index 820dbe5..dffe6cb 100644
> --- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> +++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> @@ -1,6 +1,5 @@
>  package org.apache.s4.wordcount;
>
> -
>  import java.io.File;
>  import java.io.IOException;
>  import java.util.Map.Entry;
> @@ -18,25 +17,25 @@ import org.apache.zookeeper.Watcher;
>  import org.apache.zookeeper.ZooDefs.Ids;
>  import org.apache.zookeeper.ZooKeeper;
>
> -
>  public class WordClassifierPE extends ProcessingElement implements Watcher {
>
>     TreeMap<String, Integer> counts = new TreeMap<String, Integer>();
>     private int counter;
>     transient private ZooKeeper zk;
>
> -    private WordClassifierPE () {}
> +    private WordClassifierPE() {
> +    }
>
>     public WordClassifierPE(App app) {
>         super(app);
>     }
> -
> +
>     public void onEvent(WordCountEvent event) {
>         try {
>             WordCountEvent wcEvent = event;
>             if (zk == null) {
>                 try {
> -                    zk = new ZooKeeper("localhost:21810", 4000, this);
> +                    zk = new ZooKeeper("localhost:2181", 4000, this);
>                 } catch (IOException e) {
>                     throw new RuntimeException(e);
>                 }
> @@ -73,8 +72,8 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
>                 // zookeeper
>                 zk.create("/classifierIteration_" + counter, new byte[counter], Ids.OPEN_ACL_UNSAFE,
>                         CreateMode.PERSISTENT);
> -                Logger.getLogger("s4-ft").debug("wrote classifier iteration ["+counter+"]");
> -                System.out.println("wrote classifier iteration ["+counter+"]");
> +                Logger.getLogger("s4-ft").debug("wrote classifier iteration [" + counter + "]");
> +                System.out.println("wrote classifier iteration [" + counter + "]");
>                 // check if we are allowed to continue
>                 if (null == zk.exists("/continue_" + counter, null)) {
>                     CountDownLatch latch = new CountDownLatch(1);
> @@ -96,19 +95,19 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
>     @Override
>     public void process(WatchedEvent event) {
>         // TODO Auto-generated method stub
> -
> +
>     }
>
>     @Override
>     protected void onCreate() {
>         // TODO Auto-generated method stub
> -
> +
>     }
>
>     @Override
>     protected void onRemove() {
>         // TODO Auto-generated method stub
> -
> +
>     }
>
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/resources/default.s4.properties
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
> index 62fc7d5..2235032 100644
> --- a/subprojects/s4-core/src/test/resources/default.s4.properties
> +++ b/subprojects/s4-core/src/test/resources/default.s4.properties
> @@ -4,6 +4,13 @@ cluster.hosts = localhost
>  cluster.ports = 5077
>  cluster.lock_dir = {user.dir}/tmp
>  cluster.name = s4-test-cluster
> -cluster.zk_address = localhost:21810
> +cluster.zk_address = localhost:2181
>  cluster.zk_session_timeout = 10000
>  cluster.zk_connection_timeout = 10000
> +s4.logger_level = DEBUG
> +comm.module = org.apache.s4.core.CustomModule
> +appsDir=/tmp/deploy-test
> +tcp.partition.queue_size=1000
> +comm.timeout=100
> +comm.retry_delay=100
> +comm.retries=10
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> index 9a5d11a..3a566f2 100644
> --- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> +++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> @@ -3,9 +3,9 @@ comm.queue_listener_size = 8000
>  cluster.hosts = localhost
>  cluster.ports = 5077
>  cluster.name = s4-test-cluster
> -cluster.zk_address = localhost:21810
> +cluster.zk_address = localhost:2181
>  cluster.zk_session_timeout = 10000
>  cluster.zk_connection_timeout = 10000
>  comm.module = org.apache.s4.core.adapter.AdapterModule
> -s4.logger_level = DEBUG
> +s4.logger_level = TRACE
>  appsDir=/tmp/deploy-test
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/s4-tools.gradle
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-tools/s4-tools.gradle b/subprojects/s4-tools/s4-tools.gradle
> new file mode 100644
> index 0000000..68c728e
> --- /dev/null
> +++ b/subprojects/s4-tools/s4-tools.gradle
> @@ -0,0 +1,48 @@
> +/*
> + * Copyright 2010 the original author or authors.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +description = 'The S4 core platform.'
> +
> +apply plugin: 'java'
> +
> +task "create-dirs" << {
> +   sourceSets.all*.java.srcDirs*.each { it.mkdirs() }
> +   sourceSets.all*.resources.srcDirs*.each { it.mkdirs() }
> +}
> +
> +dependencies {
> +    compile project(":s4-base")
> +    compile project(":s4-comm")
> +    compile project(":s4-core")
> +    compile libraries.jcommander
> +    compile libraries.zkclient
> +    compile libraries.commons_io
> +}
> +
> +apply plugin:'application'
> +mainClassName = "org.apache.s4.tools.Tools"
> +
> +run {
> +    // run doesn't yet directly accept command line parameters...
> +    if ( project.hasProperty('args') ) {
> +        args project.args.split('\\s+')
> +        print args
> +    }
> + }
> +
> +test {
> +    forkEvery=1
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
> new file mode 100644
> index 0000000..4fc09f0
> --- /dev/null
> +++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
> @@ -0,0 +1,65 @@
> +package org.apache.s4.tools;
> +
> +import java.util.Arrays;
> +
> +import org.apache.log4j.BasicConfigurator;
> +import org.apache.log4j.Level;
> +import org.apache.s4.comm.tools.TaskSetup;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +import com.beust.jcommander.JCommander;
> +import com.beust.jcommander.Parameter;
> +import com.beust.jcommander.Parameters;
> +
> +public class DefineCluster {
> +
> +    static Logger logger = LoggerFactory.getLogger(DefineCluster.class);
> +
> +    public static void main(String[] args) {
> +        // configure log4j for Zookeeper
> +        BasicConfigurator.configure();
> +        org.apache.log4j.Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
> +        org.apache.log4j.Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
> +
> +        ZKServerArgs clusterArgs = new ZKServerArgs();
> +        JCommander jc = new JCommander(clusterArgs);
> +        try {
> +            jc.parse(args);
> +        } catch (Exception e) {
> +            System.out.println(Arrays.toString(args));
> +            e.printStackTrace();
> +            jc.usage();
> +            System.exit(-1);
> +        }
> +        try {
> +
> +            logger.info("preparing new cluster [{}] with [{}] node(s)", clusterArgs.clusterName, clusterArgs.nbTasks);
> +
> +            TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
> +            taskSetup.clean(clusterArgs.clusterName);
> +            taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
> +            logger.info("New cluster configuration uploaded into zookeeper");
> +        } catch (Exception e) {
> +            logger.error("Cannot initialize zookeeper with specified configuration", e);
> +        }
> +
> +    }
> +
> +    @Parameters(separators = "=", commandDescription = "Setup new S4 logical cluster")
> +    static class ZKServerArgs {
> +
> +        @Parameter(names = "-name", description = "S4 cluster name", required = true)
> +        String clusterName = "s4-test-cluster";
> +
> +        @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
> +        int nbTasks = 1;
> +
> +        @Parameter(names = "-zk", description = "Zookeeper connection string")
> +        String zkConnectionString = "localhost:2181";
> +
> +        @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
> +        int firstListeningPort = -1;
> +    }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
> new file mode 100644
> index 0000000..1b2eb7d
> --- /dev/null
> +++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
> @@ -0,0 +1,162 @@
> +package org.apache.s4.tools;
> +
> +import java.io.BufferedReader;
> +import java.io.File;
> +import java.io.IOException;
> +import java.io.InputStreamReader;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.List;
> +
> +import junit.framework.Assert;
> +
> +import org.I0Itec.zkclient.ZkClient;
> +import org.apache.log4j.BasicConfigurator;
> +import org.apache.log4j.Level;
> +import org.apache.log4j.Logger;
> +import org.apache.s4.comm.topology.ZNRecord;
> +import org.apache.s4.comm.topology.ZNRecordSerializer;
> +import org.apache.s4.deploy.DistributedDeploymentManager;
> +import org.apache.zookeeper.CreateMode;
> +import org.slf4j.LoggerFactory;
> +
> +import com.beust.jcommander.JCommander;
> +import com.beust.jcommander.Parameter;
> +import com.beust.jcommander.Parameters;
> +import com.google.common.io.ByteStreams;
> +import com.google.common.io.Files;
> +
> +public class Deploy {
> +
> +    private static File tmpAppsDir;
> +    static org.slf4j.Logger logger = LoggerFactory.getLogger(Deploy.class);
> +
> +    /**
> +     * @param args
> +     */
> +    public static void main(String[] args) {
> +
> +        DeployAppArgs appArgs = new DeployAppArgs();
> +        JCommander jc = new JCommander(appArgs);
> +        // configure log4j for Zookeeper
> +        BasicConfigurator.configure();
> +        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
> +        Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
> +
> +        try {
> +            jc.parse(args);
> +        } catch (Exception e) {
> +            e.printStackTrace();
> +            jc.usage();
> +            System.exit(-1);
> +        }
> +        try {
> +            ZkClient zkClient = new ZkClient(appArgs.zkConnectionString, appArgs.timeout);
> +            zkClient.setZkSerializer(new ZNRecordSerializer());
> +
> +            tmpAppsDir = Files.createTempDir();
> +
> +            File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
> +
> +            String generatedS4RPath = null;
> +
> +            ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
> +                    new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
> +            generatedS4RPath = tmpAppsDir.getAbsolutePath() + "/" + appArgs.appName + ".s4r";
> +
> +            Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(generatedS4RPath)),
> +                    Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
> +
> +            final String uri = s4rToDeploy.toURI().toString();
> +            ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
> +            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
> +            zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
> +            logger.info("uploaded application [{}] to cluster [{}], using zookeeper znode [{}]", new String[] {
> +                    appArgs.appName, appArgs.clusterName, "/" + appArgs.clusterName + "/apps/" + appArgs.appName });
> +
> +        } catch (Exception e) {
> +            LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
> +        }
> +
> +    }
> +
> +    @Parameters(separators = "=")
> +    static class DeployAppArgs {
> +
> +        @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
> +        String gradleExecPath;
> +
> +        @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
> +        String gradleBuildFilePath;
> +
> +        @Parameter(names = "-appName", description = "name of S4 application", required = true)
> +        String appName;
> +
> +        @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
> +        String clusterName;
> +
> +        @Parameter(names = "-zk", description = "zookeeper connection string")
> +        String zkConnectionString = "localhost:2181";
> +
> +        @Parameter(names = "-timeout", description = "connection timeout to Zookeeper, in ms")
> +        int timeout = 10000;
> +
> +    }
> +
> +    static class ExecGradle {
> +
> +        public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
> +                throws Exception {
> +            // Thread.sleep(10000);
> +            List<String> cmdList = new ArrayList<String>();
> +            // cmdList.add("sleep");
> +            // cmdList.add("2");
> +            // cmdList.add(";");
> +            cmdList.add(gradlewExecPath);
> +            // cmdList.add("-c");
> +            // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
> +            cmdList.add("-b");
> +            cmdList.add(buildFilePath);
> +            cmdList.add(taskName);
> +            cmdList.add("-stacktrace");
> +            if (params.length > 0) {
> +                for (int i = 0; i < params.length; i++) {
> +                    cmdList.add("-P" + params[i]);
> +                }
> +            }
> +            System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
> +            ProcessBuilder pb = new ProcessBuilder(cmdList);
> +
> +            pb.directory(new File(buildFilePath).getParentFile());
> +            pb.redirectErrorStream();
> +
> +            final Process process = pb.start();
> +            new Thread(new Runnable() {
> +                @Override
> +                public void run() {
> +                    BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
> +                    String line;
> +                    try {
> +                        line = br.readLine();
> +                        while (line != null) {
> +                            System.out.println(line);
> +                            line = br.readLine();
> +                        }
> +                    } catch (IOException e) {
> +                        throw new RuntimeException(e);
> +                    }
> +                }
> +            }).start();
> +
> +            process.waitFor();
> +
> +            // try {
> +            // int exitValue = process.exitValue();
> +            // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
> +            // } catch (IllegalThreadStateException ignored) {
> +            // }
> +
> +        }
> +    }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
> new file mode 100644
> index 0000000..ae53cb6
> --- /dev/null
> +++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
> @@ -0,0 +1,22 @@
> +package org.apache.s4.tools;
> +
> +import java.lang.reflect.Method;
> +import java.util.Arrays;
> +
> +public class Tools {
> +
> +    public static void main(String[] args) {
> +        try {
> +            Class<?> toolClass = Class.forName(args[0]);
> +            Method main = toolClass.getMethod("main", String[].class);
> +            if (args.length > 1) {
> +                main.invoke(null, new Object[] { Arrays.copyOfRange(args, 1, args.length) });
> +            } else {
> +                main.invoke(null, new Object[] { new String[0] });
> +            }
> +        } catch (Exception e) {
> +            // TODO Auto-generated catch block
> +            e.printStackTrace();
> +        }
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
> ----------------------------------------------------------------------
> diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
> new file mode 100644
> index 0000000..cb4c85c
> --- /dev/null
> +++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
> @@ -0,0 +1,75 @@
> +package org.apache.s4.tools;
> +
> +import java.io.File;
> +import java.util.Arrays;
> +
> +import org.I0Itec.zkclient.IDefaultNameSpace;
> +import org.I0Itec.zkclient.ZkClient;
> +import org.I0Itec.zkclient.ZkServer;
> +import org.apache.commons.io.FileUtils;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +import com.beust.jcommander.JCommander;
> +import com.beust.jcommander.Parameter;
> +import com.beust.jcommander.Parameters;
> +
> +public class ZKServer {
> +
> +    static Logger logger = LoggerFactory.getLogger(ZKServer.class);
> +
> +    public static void main(String[] args) {
> +        ZKServerArgs zkArgs = new ZKServerArgs();
> +        JCommander jc = new JCommander(zkArgs);
> +        try {
> +            jc.parse(args);
> +        } catch (Exception e) {
> +            System.out.println(Arrays.toString(args));
> +            e.printStackTrace();
> +            jc.usage();
> +            System.exit(-1);
> +        }
> +        try {
> +
> +            logger.info("Starting zookeeper server on port [{}]", zkArgs.zkPort);
> +
> +            if (zkArgs.clean) {
> +                logger.info("cleaning existing data in [{}] and [{}]", zkArgs.dataDir, zkArgs.logDir);
> +                FileUtils.deleteDirectory(new File(zkArgs.dataDir));
> +                FileUtils.deleteDirectory(new File(zkArgs.logDir));
> +            }
> +            IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
> +
> +                @Override
> +                public void createDefaultNameSpace(ZkClient zkClient) {
> +
> +                }
> +            };
> +
> +            ZkServer zkServer = new ZkServer(zkArgs.dataDir, zkArgs.logDir, defaultNameSpace);
> +            zkServer.start();
> +        } catch (Exception e) {
> +            logger.error("Cannot initialize zookeeper with specified configuration", e);
> +        }
> +    }
> +
> +    @Parameters(separators = "=", commandDescription = "Start Zookeeper server")
> +    static class ZKServerArgs {
> +
> +        @Parameter(names = "-port", description = "Zookeeper port")
> +        String zkPort = "2181";
> +
> +        @Parameter(names = "-dataDir", description = "data directory", required = false)
> +        String dataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
> +                + "zookeeper" + File.separator + "data").getAbsolutePath();
> +
> +        @Parameter(names = "-clean", description = "clean zookeeper data (arity=0) (make sure you specify correct directories...)")
> +        boolean clean = true;
> +
> +        @Parameter(names = "-logDir", description = "log directory")
> +        String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
> +                + "zookeeper" + File.separator + "log").getAbsolutePath();
> +
> +    }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> index 316b876..cab4eab 100644
> --- a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> +++ b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> @@ -12,44 +12,27 @@ public class ClockApp extends App {
>     private ClockPE clockPE;
>
>     @Override
> -    protected void start() {
> +    protected void onStart() {
>         System.out.println("Starting CounterApp...");
>         clockPE.getInstanceForKey("single");
>     }
>
>     // generic array due to varargs generates a warning.
>     @Override
> -    protected void init() {
> +    protected void onInit() {
>         System.out.println("Initing CounterApp...");
>
>         clockPE = new ClockPE(this);
>         clockPE.setTimerInterval(1, TimeUnit.SECONDS);
>
> -        eventSource = new EventSource(this, "I can give you the time!");
> +        eventSource = new EventSource(this, "clockStream");
>         clockPE.setStreams((Streamable) eventSource);
>     }
>
>     @Override
> -    protected void close() {
> +    protected void onClose() {
>         System.out.println("Closing CounterApp...");
>         eventSource.close();
>     }
>
> -    @Override
> -    protected void onStart() {
> -        // TODO Auto-generated method stub
> -
> -    }
> -
> -    @Override
> -    protected void onInit() {
> -        // TODO Auto-generated method stub
> -
> -    }
> -
> -    @Override
> -    protected void onClose() {
> -        // TODO Auto-generated method stub
> -
> -    }
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> index 1c92c57..6652f59 100644
> --- a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> +++ b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> @@ -7,41 +7,23 @@ public class ShowTimeApp extends App {
>     private ShowPE showPE;
>
>     @Override
> -    protected void start() {
> +    protected void onStart() {
>         System.out.println("Starting ShowTimeApp...");
>         showPE.getInstanceForKey("single");
>     }
>
>     @Override
> -    protected void init() {
> +    protected void onInit() {
>         System.out.println("Initing ShowTimeApp...");
>
>         showPE = new ShowPE(this);
>
>         /* This stream will receive events from another app. */
> -        createStream("I need the time.", showPE);
> -    }
> -
> -    @Override
> -    protected void close() {
> -        System.out.println("Closing ShowTimeApp...");
> -    }
> -
> -    @Override
> -    protected void onStart() {
> -        // TODO Auto-generated method stub
> -
> -    }
> -
> -    @Override
> -    protected void onInit() {
> -        // TODO Auto-generated method stub
> -
> +        createStream("clockStream", showPE);
>     }
>
>     @Override
>     protected void onClose() {
> -        // TODO Auto-generated method stub
> -
> +        System.out.println("Closing ShowTimeApp...");
>     }
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> index 809cebd..8cb5843 100644
> --- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> +++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> @@ -41,7 +41,7 @@ public class SimplePE extends ProcessingElement implements Watcher {
>     protected void onCreate() {
>         if (zk == null) {
>             try {
> -                zk = new ZooKeeper("localhost:" + 21810, 4000, this);
> +                zk = new ZooKeeper("localhost:" + 2181, 4000, this);
>             } catch (IOException e) {
>                 throw new RuntimeException(e);
>             }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> index 25b98a6..bbfaf17 100644
> --- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> +++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> @@ -39,7 +39,7 @@ public class TestApp extends App {
>             } catch (IOException e) {
>                 throw new RuntimeException(e);
>             }
> -            zkClient = new ZkClient("localhost:" + 21810);
> +            zkClient = new ZkClient("localhost:" + 2181);
>             if (!zkClient.exists("/s4-test")) {
>                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
>             }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> index 0657ee9..8eb345b 100644
> --- a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> +++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> @@ -17,7 +17,7 @@ public class TestApp extends App {
>     @Override
>     protected void onInit() {
>         try {
> -            zkClient = new ZkClient("localhost:" + 21810);
> +            zkClient = new ZkClient("localhost:" + 2181);
>             if (!zkClient.exists("/s4-test")) {
>                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
>             }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/README.txt
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-adapter/README.txt b/test-apps/twitter-adapter/README.txt
> new file mode 100644
> index 0000000..f9bda45
> --- /dev/null
> +++ b/test-apps/twitter-adapter/README.txt
> @@ -0,0 +1 @@
> +Please refer to README.txt in twitter-counter application
> \ No newline at end of file
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/build.gradle
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
> index 669d62b..100837d 100644
> --- a/test-apps/twitter-adapter/build.gradle
> +++ b/test-apps/twitter-adapter/build.gradle
> @@ -49,6 +49,9 @@ group = 'org.apache.s4'
>
>  apply plugin: 'java'
>  apply plugin: 'eclipse'
> +apply plugin:'application'
> +
> +mainClassName = "org.apache.s4.core.Main"
>
>  /* The app classname is set automatically from the source files. */
>  def appClassname = ''
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> index ee905d9..102ca10 100644
> --- a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> +++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> @@ -1,6 +1,9 @@
>  package org.apache.s4.example.twitter;
>
> +import java.io.File;
> +import java.io.FileInputStream;
>  import java.net.ServerSocket;
> +import java.util.Properties;
>  import java.util.concurrent.LinkedBlockingQueue;
>
>  import org.I0Itec.zkclient.ZkClient;
> @@ -15,6 +18,7 @@ import twitter4j.StatusDeletionNotice;
>  import twitter4j.StatusListener;
>  import twitter4j.TwitterStream;
>  import twitter4j.TwitterStreamFactory;
> +import twitter4j.conf.ConfigurationBuilder;
>
>  public class TwitterInputAdapter extends Adapter {
>
> @@ -49,7 +53,20 @@ public class TwitterInputAdapter extends Adapter {
>
>     public void connectAndRead() throws Exception {
>
> -        TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
> +        ConfigurationBuilder cb = new ConfigurationBuilder();
> +        Properties twitterProperties = new Properties();
> +        File twitter4jPropsFile = new File(System.getProperty("user.home") + "/twitter4j.properties");
> +        if (!twitter4jPropsFile.exists()) {
> +            logger.error(
> +                    "Cannot find twitter4j.properties file in this location :[{}]. Make sure it is available at this place and includes user/password credentials",
> +                    twitter4jPropsFile.getAbsolutePath());
> +            return;
> +        }
> +        twitterProperties.load(new FileInputStream(twitter4jPropsFile));
> +
> +        cb.setDebugEnabled(Boolean.valueOf(twitterProperties.getProperty("debug")))
> +                .setUser(twitterProperties.getProperty("user")).setPassword(twitterProperties.getProperty("password"));
> +        TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
>         StatusListener statusListener = new StatusListener() {
>
>             @Override
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/default.s4.properties
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-adapter/src/main/resources/default.s4.properties b/test-apps/twitter-adapter/src/main/resources/default.s4.properties
> deleted file mode 100644
> index cd36aaa..0000000
> --- a/test-apps/twitter-adapter/src/main/resources/default.s4.properties
> +++ /dev/null
> @@ -1,18 +0,0 @@
> -comm.queue_emmiter_size = 8000
> -comm.queue_listener_size = 8000
> -cluster.hosts = localhost
> -cluster.ports = 5077
> -cluster.name = s4-adapter-cluster
> -cluster.zk_address = localhost:21810
> -cluster.zk_session_timeout = 10000
> -cluster.zk_connection_timeout = 10000
> -comm.module = org.apache.s4.deploy.TestModule
> -s4.logger_level = TRACE
> -appsDir=/tmp/deploy-test
> -tcp.partition.queue_size=1000
> -comm.timeout=100
> -comm.retry_delay=100
> -comm.retries=10
> -
> -# specify the name of the remote cluster (there is currently only 1 remote cluster max)
> -cluster.remote.name=s4-test-cluster
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/s4.properties
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-adapter/src/main/resources/s4.properties b/test-apps/twitter-adapter/src/main/resources/s4.properties
> new file mode 100644
> index 0000000..fdd1bf6
> --- /dev/null
> +++ b/test-apps/twitter-adapter/src/main/resources/s4.properties
> @@ -0,0 +1,18 @@
> +comm.queue_emmiter_size = 8000
> +comm.queue_listener_size = 8000
> +cluster.hosts = localhost
> +cluster.ports = 5077
> +cluster.name = s4-adapter-cluster
> +cluster.zk_address = localhost:2181
> +cluster.zk_session_timeout = 10000
> +cluster.zk_connection_timeout = 10000
> +comm.module = org.apache.s4.core.adapter.AdapterModule
> +s4.logger_level = DEBUG
> +appsDir=/tmp/deploy-test
> +tcp.partition.queue_size=1000
> +comm.timeout=100
> +comm.retry_delay=100
> +comm.retries=10
> +
> +# specify the name of the remote cluster (there is currently only 1 remote cluster max)
> +cluster.remote.name=s4-test-cluster
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
> deleted file mode 100644
> index 7d58c7d..0000000
> --- a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
> +++ /dev/null
> @@ -1,5 +0,0 @@
> -debug=true
> -# you need to set those parameters with valid twitter account credentials
> -twitter4j.user=????
> -twitter4j.password=????
> -
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/README.txt
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-counter/README.txt b/test-apps/twitter-counter/README.txt
> new file mode 100644
> index 0000000..d46b491
> --- /dev/null
> +++ b/test-apps/twitter-counter/README.txt
> @@ -0,0 +1,33 @@
> +An application that displays the current top 10 topics, as gathered from the twitter sample stream.
> +It was ported and adapted from S4 0.3
> +
> +Architecture:
> +- twitter-adapter app in adapter node connects to the twitter stream, extracts the twitted text and passes that to the application cluster
> +- twitter-counter app in the application cluster receives the text of the tweets, extracts the topics, counts topic occurences and periodically displays the top 10 topics on the console
> +
> +How to configure:
> +- you need a twitter4j.properties file in your home dir, with the following properties filled:
> +debug=true|false
> +user=<a twitter user name>
> +password=<the matching password>
> +
> +How to run:
> +0/ make sure tools are compiled by running ./gradlew s4-tools:installApp
> +
> +1/ start zookeeper server
> +./s4 zkServer
> +
> +2/ create adapter cluster configuration
> +./s4 newCluster -name=s4-test-cluster -firstListeningPort=10000 -nbTasks=1
> +
> +3/ create application cluster configuration
> +./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=11000 -nbTasks=<number of nodes>
> +NOTE: - the name of the downstream cluster is currently hardcoded in <s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties Make sure you use the same name
> +
> +4/ start application nodes (as many as defined in the cluster configuration, more for failover capabilities)
> +./s4 appNode <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
> +
> +5/ start adapter node
> +./s4 adapterNode -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
> +
> +6/ observe the topic count output (on 1 of the application nodes)
> \ No newline at end of file
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/build.gradle
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
> index e737b40..e10eae0 100644
> --- a/test-apps/twitter-counter/build.gradle
> +++ b/test-apps/twitter-counter/build.gradle
> @@ -49,6 +49,7 @@ group = 'org.apache.s4'
>
>  apply plugin: 'java'
>  apply plugin: 'eclipse'
> +apply plugin:'application'
>
>  /* The app classname is set automatically from the source files. */
>  def appClassname = ''
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> index 3d9a9fb..6fd68b5 100644
> --- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> +++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> @@ -4,6 +4,7 @@ import java.util.Iterator;
>  import java.util.Map;
>  import java.util.TreeSet;
>
> +import org.apache.s4.base.Event;
>  import org.apache.s4.core.App;
>  import org.apache.s4.core.ProcessingElement;
>  import org.slf4j.Logger;
> @@ -14,16 +15,16 @@ import com.google.common.collect.Sets;
>
>  public class TopNTopicPE extends ProcessingElement {
>
> +    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
> +    Map<String, Integer> countedTopics = Maps.newHashMap();
> +
>     public TopNTopicPE(App app) {
>         super(app);
> -        // TODO Auto-generated constructor stub
> +        logger.info("key: [{}]", getId());
>     }
>
> -    Map<String, Integer> countedTopics = Maps.newHashMap();
> -    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
> -
> -    public void onEvent(TopicSeenEvent event) {
> -        countedTopics.put(event.topic, event.count);
> +    public void onEvent(Event event) {
> +        countedTopics.put(event.get("topic"), event.get("count", Integer.class));
>     }
>
>     public void onTime() {
> @@ -32,6 +33,8 @@ public class TopNTopicPE extends ProcessingElement {
>             sortedTopics.add(new TopNEntry(topicCount.getKey(), topicCount.getValue()));
>         }
>
> +        logger.info("\n------------------");
> +
>         int i = 0;
>         Iterator<TopNEntry> iterator = sortedTopics.iterator();
>         long time = System.currentTimeMillis();
> @@ -39,6 +42,7 @@ public class TopNTopicPE extends ProcessingElement {
>             TopNEntry entry = iterator.next();
>             logger.info("{} : topic [{}] count [{}]",
>                     new String[] { String.valueOf(time), entry.topic, String.valueOf(entry.count) });
> +            i++;
>         }
>     }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> index a6d1478..5a41231 100644
> --- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> +++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> @@ -1,27 +1,36 @@
>  package org.apache.s4.example.twitter;
>
> +import org.apache.s4.base.Event;
>  import org.apache.s4.core.App;
>  import org.apache.s4.core.ProcessingElement;
>  import org.apache.s4.core.Stream;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
>  // keyed by topic name
>  public class TopicCountAndReportPE extends ProcessingElement {
>
> -    Stream<TopicSeenEvent> downStream;
> +    Stream<Event> downStream;
>     int threshold = 10;
>     int count;
> +    boolean firstEvent = true;
> +
> +    static Logger logger = LoggerFactory.getLogger(TopicCountAndReportPE.class);
>
>     public TopicCountAndReportPE(App app) {
>         super(app);
> -        // TODO Auto-generated constructor stub
>     }
>
> -    public void setDownstream(Stream<TopicSeenEvent> stream) {
> +    public void setDownstream(Stream<Event> stream) {
>         this.downStream = stream;
>     }
>
> -    public void onEvent(TopicSeenEvent event) {
> -        count += event.count;
> +    public void onEvent(Event event) {
> +        if (firstEvent) {
> +            logger.info("Handling new topic [{}]", getId());
> +            firstEvent = false;
> +        }
> +        count += event.get("count", Integer.class);
>     }
>
>     @Override
> @@ -34,14 +43,15 @@ public class TopicCountAndReportPE extends ProcessingElement {
>         if (count < threshold) {
>             return;
>         }
> -        TopicSeenEvent topicSeenEvent = new TopicSeenEvent(getId(), count);
> +        Event topicSeenEvent = new Event();
> +        topicSeenEvent.put("topic", String.class, getId());
> +        topicSeenEvent.put("count", Integer.class, count);
> +        topicSeenEvent.put("aggregationKey", String.class, "aggregationValue");
>         downStream.put(topicSeenEvent);
>     }
>
>     @Override
>     protected void onRemove() {
> -        // TODO Auto-generated method stub
> -
>     }
>
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> index 501d6fd..9936ed0 100644
> --- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> +++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> @@ -6,13 +6,16 @@ import org.apache.s4.base.Event;
>  import org.apache.s4.core.App;
>  import org.apache.s4.core.ProcessingElement;
>  import org.apache.s4.core.Streamable;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
>  import com.google.common.base.Splitter;
>
>  public class TopicExtractorPE extends ProcessingElement {
>
>     static private ServerSocket serverSocket;
> -    Streamable<TopicSeenEvent> downStream;
> +    Streamable<Event> downStream;
> +    static Logger logger = LoggerFactory.getLogger(TopicExtractorPE.class);
>
>     public TopicExtractorPE(App app) {
>         super(app);
> @@ -24,18 +27,22 @@ public class TopicExtractorPE extends ProcessingElement {
>
>     }
>
> -    public void setDownStream(Streamable<TopicSeenEvent> stream) {
> +    public void setDownStream(Streamable<Event> stream) {
>         this.downStream = stream;
>     }
>
>     public void onEvent(Event event) {
>         String text = event.get("statusText", String.class);
> +        logger.trace("event text [{}]", text);
>         if (text.contains("#")) {
>             Iterable<String> split = Splitter.on("#").omitEmptyStrings().trimResults()
>                     .split(text.substring(text.indexOf("#") + 1, text.length()));
>             for (String topic : split) {
>                 String topicOnly = topic.split(" ")[0];
> -                downStream.put(new TopicSeenEvent(topicOnly, 1));
> +                Event event2 = new Event();
> +                event2.put("topic", String.class, topicOnly);
> +                event2.put("count", Integer.class, 1);
> +                downStream.put(event2);
>             }
>         }
>     }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
> deleted file mode 100644
> index b28d61e..0000000
> --- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
> +++ /dev/null
> @@ -1,17 +0,0 @@
> -package org.apache.s4.example.twitter;
> -
> -import org.apache.s4.base.Event;
> -
> -public class TopicSeenEvent extends Event {
> -
> -    public String topic;
> -    public int count;
> -    public String reportKey = "x";
> -
> -    public TopicSeenEvent(String topic, int count) {
> -        super();
> -        this.topic = topic;
> -        this.count = count;
> -    }
> -
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> index cf0fb40..f6edd8c 100644
> --- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> +++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> @@ -29,13 +29,13 @@ public class TwitterCounterApp extends App {
>             TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
>             topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
>             @SuppressWarnings("unchecked")
> -            Stream<TopicSeenEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
> +            Stream<Event> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
>
>                 @Override
>                 public List<String> get(Event arg0) {
>                     return new ArrayList<String>() {
>                         {
> -                            add("x");
> +                            add("aggregationKey");
>                         }
>                     };
>                 }
> @@ -44,13 +44,13 @@ public class TwitterCounterApp extends App {
>             TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
>             topicCountAndReportPE.setDownstream(aggregatedTopicStream);
>             topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
> -            Stream<TopicSeenEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicSeenEvent>() {
> +            Stream<Event> topicSeenStream = createStream("TopicSeen", new KeyFinder<Event>() {
>
>                 @Override
> -                public List<String> get(final TopicSeenEvent arg0) {
> +                public List<String> get(final Event arg0) {
>                     return new ArrayList<String>() {
>                         {
> -                            add(arg0.topic);
> +                            add(arg0.get("topic"));
>                         }
>                     };
>                 }
>
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/resources/default.s4.properties
> ----------------------------------------------------------------------
> diff --git a/test-apps/twitter-counter/src/main/resources/default.s4.properties b/test-apps/twitter-counter/src/main/resources/default.s4.properties
> new file mode 100644
> index 0000000..d5da3f3
> --- /dev/null
> +++ b/test-apps/twitter-counter/src/main/resources/default.s4.properties
> @@ -0,0 +1,15 @@
> +comm.queue_emmiter_size = 8000
> +comm.queue_listener_size = 8000
> +cluster.hosts = localhost
> +cluster.ports = 5077
> +cluster.name = s4-adapter-cluster
> +cluster.zk_address = localhost:21810
> +cluster.zk_session_timeout = 10000
> +cluster.zk_connection_timeout = 10000
> +comm.module = org.apache.s4.core.CustomModule
> +s4.logger_level = DEBUG
> +appsDir=/tmp/deploy-test
> +tcp.partition.queue_size=1000
> +comm.timeout=100
> +comm.retry_delay=100
> +comm.retries=10
>

Mime
View raw message