incubator-s4-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Karthik Kambatla <kkamb...@cs.purdue.edu>
Subject Re: [19/22] git commit: utility scripts + refactored twitter app - scripts for defining cluster configurations, starting nodes and uploading apps - removed custom event class for twitter app
Date Fri, 15 Jun 2012 17:51:21 GMT
I see that the changes are merged into piper. Please update your piper
branch.

Thanks
Karthik

On Fri, Jun 15, 2012 at 10:35 AM, Shailendra Mishra
<shailendrah@gmail.com>wrote:

> Which branch do I refresh from to get these fixes ? Should I continue
> with git checkout S4-22 or go directly to the piper branch. -
> Shailendra
>
> On Fri, Jun 15, 2012 at 9:06 AM,  <mmorel@apache.org> wrote:
> > utility scripts + refactored twitter app
> > - scripts for defining cluster configurations, starting nodes and
> > uploading apps
> > - removed custom event class for twitter app
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
> > Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/91a7fff8
> > Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/91a7fff8
> > Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/91a7fff8
> >
> > Branch: refs/heads/piper
> > Commit: 91a7fff86c1727ae6852d9dd0683d1078a77c86d
> > Parents: d11f7fb
> > Author: Matthieu Morel <mmorel@apache.org>
> > Authored: Mon Mar 26 15:46:20 2012 +0200
> > Committer: Matthieu Morel <mmorel@apache.org>
> > Committed: Thu Mar 29 13:45:33 2012 +0200
> >
> > ----------------------------------------------------------------------
> >  build.gradle                                       |    3 +-
> >  s4                                                 |   44 ++++
> >  settings.gradle                                    |    5 +
> >  .../java/org/apache/s4/fixtures/CommTestUtils.java |    2 +-
> >  .../src/main/java/org/apache/s4/core/App.java      |    2 +
> >  .../src/main/java/org/apache/s4/core/Main.java     |   23 ++-
> >  .../org/apache/s4/core/adapter/AdapterMain.java    |    8 +-
> >  .../java/org/apache/s4/deploy/util/DeployApp.java  |  151 --------------
> >  .../main/java/org/apache/s4/fluent/FluentApp.java  |   11 -
> >  .../test/java/org/apache/s4/fixtures/ZKServer.java |   67 ------
> >  .../org/apache/s4/wordcount/WordClassifierPE.java  |   19 +-
> >  .../src/test/resources/default.s4.properties       |    9 +-
> >  .../resources/org.apache.s4.deploy.s4.properties   |    4 +-
> >  subprojects/s4-tools/s4-tools.gradle               |   48 +++++
> >  .../java/org/apache/s4/tools/DefineCluster.java    |   65 ++++++
> >  .../src/main/java/org/apache/s4/tools/Deploy.java  |  162
> +++++++++++++++
> >  .../src/main/java/org/apache/s4/tools/Tools.java   |   22 ++
> >  .../main/java/org/apache/s4/tools/ZKServer.java    |   75 +++++++
> >  .../s4-counter/src/main/java/s4app/ClockApp.java   |   25 +--
> >  .../src/main/java/s4app/ShowTimeApp.java           |   26 +--
> >  .../main/java/org/apache/s4/deploy/SimplePE.java   |    2 +-
> >  .../main/java/org/apache/s4/deploy/TestApp.java    |    2 +-
> >  .../main/java/org/apache/s4/deploy/TestApp.java    |    2 +-
> >  test-apps/twitter-adapter/README.txt               |    1 +
> >  test-apps/twitter-adapter/build.gradle             |    3 +
> >  .../s4/example/twitter/TwitterInputAdapter.java    |   19 ++-
> >  .../src/main/resources/default.s4.properties       |   18 --
> >  .../src/main/resources/s4.properties               |   18 ++
> >  .../src/main/resources/twitter4j.properties        |    5 -
> >  test-apps/twitter-counter/README.txt               |   33 +++
> >  test-apps/twitter-counter/build.gradle             |    1 +
> >  .../org/apache/s4/example/twitter/TopNTopicPE.java |   16 +-
> >  .../s4/example/twitter/TopicCountAndReportPE.java  |   26 ++-
> >  .../s4/example/twitter/TopicExtractorPE.java       |   13 +-
> >  .../apache/s4/example/twitter/TopicSeenEvent.java  |   17 --
> >  .../s4/example/twitter/TwitterCounterApp.java      |   10 +-
> >  .../src/main/resources/default.s4.properties       |   15 ++
> >  37 files changed, 602 insertions(+), 370 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/build.gradle
> > ----------------------------------------------------------------------
> > diff --git a/build.gradle b/build.gradle
> > index 7db43d7..fd75057 100644
> > --- a/build.gradle
> > +++ b/build.gradle
> > @@ -72,7 +72,8 @@ libraries = [
> >     junit:              'junit:junit:4.10',
> >     zkclient:           'com.github.sgroschupf:zkclient:0.1',
> >     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
> > -    jcommander:         'com.beust:jcommander:1.23'
> > +    jcommander:         'com.beust:jcommander:1.23',
> > +    commons_io:         'commons-io:commons-io:2.1'
> >  ]
> >
> >  subprojects {
> >
> > http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/s4
> > ----------------------------------------------------------------------
> > diff --git a/s4 b/s4
> > new file mode 100755
> > index 0000000..a958cc2
> > --- /dev/null
> > +++ b/s4
> > @@ -0,0 +1,44 @@
> > +#!/bin/bash
> > +
> > +# NOTE: "./gradlew s4-tools:installApp" will prepare/update the tools
> subproject and related startup scripts
> > +
> > +echo $0
> > +echo $1
> > +
> > +GRADLE=`pwd`/gradlew
> > +
> > +case "$1" in
> > +"deploy")
> > +# examples:
> > +# ./s4 deploy -appName=twitter-counter
> -buildFile=<s4-dir>/test-apps/twitter-counter/build.gradle
> -cluster=s4-test-cluster
> > +# ./s4 deploy -appName=twitter-adapter
> -buildFile=<s4-dir>/test-apps/twitter-adapter/build.gradle
> -cluster=s4-adapter-cluster
> > +       shift
> > +    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools
> org.apache.s4.tools.Deploy -gradle=$GRADLE $@
> > +;;
> > +"zkServer")
> > +       shift
> > +    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools
> org.apache.s4.tools.ZKServer $@
> > +;;
> > +"newCluster")
> > +# examples:
> > +#./s4 newCluster -name=s4-test-cluster -firstListeningPort=11000
> -nbTasks=2 ; ./s4 newCluster -name=s4-adapter-cluster
> -firstListeningPort=13000 -nbTasks=1
> > +       shift
> > +    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools
> org.apache.s4.tools.DefineCluster $@
> > +;;
> > +"appNode")
> > +# example:
> > +# ./s4 appNode
> <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
> > +       shift
> > +       subprojects/s4-tools/build/install/s4-tools/bin/s4-tools
> org.apache.s4.core.Main $@
> > +;;
> > +"adapterNode")
> > +# example:
> > +# ./s4 adapterNode
> -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
> > +       shift
> > +       subprojects/s4-tools/build/install/s4-tools/bin/s4-tools
> org.apache.s4.core.adapter.AdapterMain $@
> > +;;
> > +
> > +
> > +
> > +esac
> > +
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/settings.gradle
> > ----------------------------------------------------------------------
> > diff --git a/settings.gradle b/settings.gradle
> > index a5655c4..33827ba 100644
> > --- a/settings.gradle
> > +++ b/settings.gradle
> > @@ -17,8 +17,13 @@ include 's4-base'
> >  include 's4-core'
> >  include 's4-comm'
> >  include 's4-example'
> > +include 's4-tools'
> > +//include 's4-example'
> > +//include ':test-apps:simple-adapter-1'
> >  include ':test-apps:simple-deployable-app-1'
> >  include ':test-apps:simple-deployable-app-2'
> > +include ':test-apps:s4-showtime'
> > +include ':test-apps:s4-counter'
> >
> >  rootProject.name = 's4'
> >  rootProject.children.each {project ->
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> > index e0a9456..7d7913d 100644
> > ---
> a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> > +++
> b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
> > @@ -38,7 +38,7 @@ public class CommTestUtils {
> >
> >     private static final Logger logger =
> LoggerFactory.getLogger(CommTestUtils.class);
> >
> > -    public static final int ZK_PORT = 21810;
> > +    public static final int ZK_PORT = 2181;
> >     public static final int INITIAL_BOOKIE_PORT = 5000;
> >     public static File DEFAULT_TEST_OUTPUT_DIR = new
> File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
> >     public static File DEFAULT_STORAGE_DIR = new
> File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> > index 2575013..dd46439 100644
> > --- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> > +++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
> > @@ -18,6 +18,7 @@ package org.apache.s4.core;
> >  import java.util.ArrayList;
> >  import java.util.Arrays;
> >  import java.util.List;
> > +import java.util.Map;
> >  import java.util.concurrent.TimeUnit;
> >
> >  import org.apache.s4.base.Event;
> > @@ -25,6 +26,7 @@ import org.apache.s4.base.KeyFinder;
> >  import org.slf4j.Logger;
> >  import org.slf4j.LoggerFactory;
> >
> > +import com.google.common.collect.Maps;
> >  import com.google.inject.AbstractModule;
> >  import com.google.inject.Guice;
> >  import com.google.inject.Inject;
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> > index 71756a4..c18603c 100644
> > --- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> > +++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
> > @@ -29,16 +29,19 @@ public class Main {
> >      * @param args
> >      */
> >     public static void main(String[] args) {
> > -
> > -        if (args.length == 0) {
> > -            logger.info("Starting S4 node with default configuration");
> > -            startDefaultS4Node();
> > -        } else if (args.length == 1) {
> > -            logger.info("Starting S4 node with custom configuration
> from file {}", args[0]);
> > -            startCustomS4Node(args[0]);
> > -        } else {
> > -            logger.info("Starting S4 node in development mode");
> > -            startDevelopmentMode(args);
> > +        try {
> > +            if (args.length == 0) {
> > +                logger.info("Starting S4 node with default
> configuration");
> > +                startDefaultS4Node();
> > +            } else if (args.length == 1) {
> > +                logger.info("Starting S4 node with custom
> configuration from file {}", args[0]);
> > +                startCustomS4Node(args[0]);
> > +            } else {
> > +                logger.info("Starting S4 node in development mode");
> > +                startDevelopmentMode(args);
> > +            }
> > +        } catch (Exception e) {
> > +            logger.error("Cannot start S4 node", e);
> >         }
> >     }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> > index a1c496c..f027f06 100644
> > ---
> a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> > +++
> b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
> > @@ -46,13 +46,7 @@ public class AdapterMain {
> >     @Parameters(separators = "=")
> >     static class AdapterArgs {
> >
> > -        @Parameter(names = "-moduleClass", description = "module class
> name")
> > -        String moduleClass;
> > -
> > -        @Parameter(names = "-adapterClass", description = "adapter
> class name")
> > -        String adapterClass;
> > -
> > -        @Parameter(names = "-s4Properties", description = "s4
> properties file path")
> > +        @Parameter(names = "-s4Properties", description = "s4
> properties file path", required = true)
> >         String s4PropertiesFilePath;
> >     }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
> b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
> > deleted file mode 100644
> > index d8b77a4..0000000
> > ---
> a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
> > +++ /dev/null
> > @@ -1,151 +0,0 @@
> > -package org.apache.s4.deploy.util;
> > -
> > -import java.io.BufferedReader;
> > -import java.io.File;
> > -import java.io.IOException;
> > -import java.io.InputStreamReader;
> > -import java.util.ArrayList;
> > -import java.util.Arrays;
> > -import java.util.List;
> > -
> > -import junit.framework.Assert;
> > -
> > -import org.I0Itec.zkclient.ZkClient;
> > -import org.apache.log4j.BasicConfigurator;
> > -import org.apache.log4j.Level;
> > -import org.apache.log4j.Logger;
> > -import org.apache.s4.comm.topology.ZNRecord;
> > -import org.apache.s4.comm.topology.ZNRecordSerializer;
> > -import org.apache.s4.deploy.DistributedDeploymentManager;
> > -import org.apache.zookeeper.CreateMode;
> > -import org.slf4j.LoggerFactory;
> > -
> > -import com.beust.jcommander.JCommander;
> > -import com.beust.jcommander.Parameter;
> > -import com.beust.jcommander.Parameters;
> > -import com.google.common.io.ByteStreams;
> > -import com.google.common.io.Files;
> > -
> > -public class DeployApp {
> > -
> > -    private static File tmpAppsDir;
> > -
> > -    /**
> > -     * @param args
> > -     */
> > -    public static void main(String[] args) {
> > -
> > -        DeployAppArgs appArgs = new DeployAppArgs();
> > -        JCommander jc = new JCommander(appArgs);
> > -        // configure log4j for Zookeeper
> > -        BasicConfigurator.configure();
> > -        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
> > -        Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
> > -
> > -        try {
> > -            jc.parse(args);
> > -        } catch (Exception e) {
> > -            jc.usage();
> > -            System.exit(-1);
> > -        }
> > -        try {
> > -            ZkClient zkClient = new
> ZkClient(appArgs.zkConnectionString);
> > -            zkClient.setZkSerializer(new ZNRecordSerializer());
> > -
> > -            tmpAppsDir = Files.createTempDir();
> > -
> > -            // File gradlewFile = CoreTestUtils.findGradlewInRootDir();
> > -
> > -            // CoreTestUtils.callGradleTask(gradlewFile, new
> File(appArgs.gradleBuildFilePath), "installS4R",
> > -            // new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath()
> });
> > -            ExecGradle.exec(appArgs.gradleExecPath,
> appArgs.gradleBuildFilePath, "installS4R",
> > -                    new String[] { "appsDir=" +
> tmpAppsDir.getAbsolutePath() });
> > -
> > -            File s4rToDeploy = File.createTempFile("testapp" +
> System.currentTimeMillis(), "s4r");
> > -
> > -
>  Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new
> File(tmpAppsDir.getAbsolutePath() + "/"
> > -                    + appArgs.appName + ".s4r")),
> Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
> > -
> > -            final String uri = s4rToDeploy.toURI().toString();
> > -            ZNRecord record = new
> ZNRecord(String.valueOf(System.currentTimeMillis()));
> > -            record.putSimpleField(DistributedDeploymentManager.S4R_URI,
> uri);
> > -            zkClient.create("/" + appArgs.clusterName + "/apps/" +
> appArgs.appName, record, CreateMode.PERSISTENT);
> > -
> > -        } catch (Exception e) {
> > -            LoggerFactory.getLogger(DeployApp.class).error("Cannot
> deploy app", e);
> > -        }
> > -
> > -    }
> > -
> > -    @Parameters(separators = "=")
> > -    static class DeployAppArgs {
> > -
> > -        @Parameter(names = "-gradle", description = "path to
> gradle/gradlew executable", required = true)
> > -        String gradleExecPath;
> > -
> > -        @Parameter(names = "-buildFile", description = "path to gradle
> build file for the S4 application", required = true)
> > -        String gradleBuildFilePath;
> > -
> > -        @Parameter(names = "-appName", description = "name of S4
> application", required = true)
> > -        String appName;
> > -
> > -        @Parameter(names = "-cluster", description = "logical name of
> the S4 cluster", required = true)
> > -        String clusterName;
> > -
> > -        @Parameter(names = "-zk", description = "zookeeper connection
> string")
> > -        String zkConnectionString = "localhost:2181";
> > -
> > -    }
> > -
> > -    static class ExecGradle {
> > -
> > -        public static void exec(String gradlewExecPath, String
> buildFilePath, String taskName, String[] params)
> > -                throws Exception {
> > -            List<String> cmdList = new ArrayList<String>();
> > -            cmdList.add(gradlewExecPath);
> > -            // cmdList.add("-c");
> > -            //
> cmdList.add(gradlewFile.getParentFile().getAbsolutePath() +
> "/settings.gradle");
> > -            cmdList.add("-b");
> > -            cmdList.add(buildFilePath);
> > -            cmdList.add(taskName);
> > -            if (params.length > 0) {
> > -                for (int i = 0; i < params.length; i++) {
> > -                    cmdList.add("-P" + params[i]);
> > -                }
> > -            }
> > -
> > -            System.out.println(Arrays.toString(cmdList.toArray(new
> String[] {})).replace(",", ""));
> > -            ProcessBuilder pb = new ProcessBuilder(cmdList);
> > -
> > -            pb.directory(new File(buildFilePath).getParentFile());
> > -            pb.redirectErrorStream();
> > -
> > -            final Process process = pb.start();
> > -            new Thread(new Runnable() {
> > -                @Override
> > -                public void run() {
> > -                    BufferedReader br = new BufferedReader(new
> InputStreamReader(process.getInputStream()));
> > -                    String line;
> > -                    try {
> > -                        line = br.readLine();
> > -                        while (line != null) {
> > -                            System.out.println(line);
> > -                            line = br.readLine();
> > -                        }
> > -                    } catch (IOException e) {
> > -                        throw new RuntimeException(e);
> > -                    }
> > -                }
> > -            }).start();
> > -            process.waitFor();
> > -
> > -            // try {
> > -            // int exitValue = process.exitValue();
> > -            // Assert.fail("forked process failed to start correctly.
> Exit code is [" + exitValue + "]");
> > -            // } catch (IllegalThreadStateException ignored) {
> > -            // }
> > -
> > -        }
> > -    }
> > -
> > -}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> > index 9f66e0d..670c375 100644
> > ---
> a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> > +++
> b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
> > @@ -35,15 +35,4 @@ public class FluentApp extends App {
> >         appMaker.close();
> >     }
> >
> > -    public void start() {
> > -        super.start();
> > -    }
> > -
> > -    public void init() {
> > -        super.init();
> > -    }
> > -
> > -    public void close() {
> > -        super.close();
> > -    }
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
> b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
> > deleted file mode 100644
> > index b79f1a0..0000000
> > ---
> a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
> > +++ /dev/null
> > @@ -1,67 +0,0 @@
> > -package org.apache.s4.fixtures;
> > -
> > -import java.util.Arrays;
> > -
> > -import org.apache.s4.comm.tools.TaskSetup;
> > -import org.slf4j.Logger;
> > -import org.slf4j.LoggerFactory;
> > -
> > -import com.beust.jcommander.JCommander;
> > -import com.beust.jcommander.Parameter;
> > -import com.beust.jcommander.Parameters;
> > -
> > -public class ZKServer {
> > -
> > -    private static Logger logger =
> LoggerFactory.getLogger(ZKServer.class);
> > -
> > -    /**
> > -     * @param args
> > -     */
> > -    public static void main(String[] args) {
> > -        ZKServerArgs clusterArgs = new ZKServerArgs();
> > -        JCommander jc = new JCommander(clusterArgs);
> > -        try {
> > -            jc.parse(args);
> > -        } catch (Exception e) {
> > -            System.out.println(Arrays.toString(args));
> > -            e.printStackTrace();
> > -            jc.usage();
> > -            System.exit(-1);
> > -        }
> > -        try {
> > -
> > -            logger.info("Starting zookeeper server for cluster [{}]
> with [{}] node(s)", clusterArgs.clusterName,
> > -                    clusterArgs.nbTasks);
> > -            if (clusterArgs.startZK) {
> > -                CommTestUtils.startZookeeperServer();
> > -            }
> > -            TaskSetup taskSetup = new
> TaskSetup(clusterArgs.zkConnectionString);
> > -            taskSetup.clean(clusterArgs.clusterName);
> > -            taskSetup.setup(clusterArgs.clusterName,
> clusterArgs.nbTasks, clusterArgs.firstListeningPort);
> > -            logger.info("Zookeeper started");
> > -        } catch (Exception e) {
> > -            logger.error("Cannot initialize zookeeper with specified
> configuration", e);
> > -        }
> > -
> > -    }
> > -
> > -    @Parameters(separators = "=", commandDescription = "Start Zookeeper
> server and initialize S4 cluster configuration in Zookeeper (and clean
> previous one with same cluster name)")
> > -    static class ZKServerArgs {
> > -
> > -        @Parameter(names = "-cluster", description = "S4 cluster name",
> required = true)
> > -        String clusterName = "s4-test-cluster";
> > -
> > -        @Parameter(names = "-nbTasks", description = "number of tasks
> for the cluster", required = true)
> > -        int nbTasks = 1;
> > -
> > -        @Parameter(names = "-zk", description = "Zookeeper connection
> string")
> > -        String zkConnectionString = "localhost:21810";
> > -
> > -        @Parameter(names = "-startZK", description = "Start local
> zookeeper server (connection string ignored in that case)", required =
> false)
> > -        boolean startZK = false;
> > -
> > -        @Parameter(names = "-firstListeningPort", description =
> "Initial listening port for nodes in this cluster. First node listens on
> the specified port, other nodes listen on port initial + nodeIndex",
> required = true)
> > -        int firstListeningPort = -1;
> > -    }
> > -
> > -}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> > index 820dbe5..dffe6cb 100644
> > ---
> a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> > +++
> b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
> > @@ -1,6 +1,5 @@
> >  package org.apache.s4.wordcount;
> >
> > -
> >  import java.io.File;
> >  import java.io.IOException;
> >  import java.util.Map.Entry;
> > @@ -18,25 +17,25 @@ import org.apache.zookeeper.Watcher;
> >  import org.apache.zookeeper.ZooDefs.Ids;
> >  import org.apache.zookeeper.ZooKeeper;
> >
> > -
> >  public class WordClassifierPE extends ProcessingElement implements
> Watcher {
> >
> >     TreeMap<String, Integer> counts = new TreeMap<String, Integer>();
> >     private int counter;
> >     transient private ZooKeeper zk;
> >
> > -    private WordClassifierPE () {}
> > +    private WordClassifierPE() {
> > +    }
> >
> >     public WordClassifierPE(App app) {
> >         super(app);
> >     }
> > -
> > +
> >     public void onEvent(WordCountEvent event) {
> >         try {
> >             WordCountEvent wcEvent = event;
> >             if (zk == null) {
> >                 try {
> > -                    zk = new ZooKeeper("localhost:21810", 4000, this);
> > +                    zk = new ZooKeeper("localhost:2181", 4000, this);
> >                 } catch (IOException e) {
> >                     throw new RuntimeException(e);
> >                 }
> > @@ -73,8 +72,8 @@ public class WordClassifierPE extends
> ProcessingElement implements Watcher {
> >                 // zookeeper
> >                 zk.create("/classifierIteration_" + counter, new
> byte[counter], Ids.OPEN_ACL_UNSAFE,
> >                         CreateMode.PERSISTENT);
> > -                Logger.getLogger("s4-ft").debug("wrote classifier
> iteration ["+counter+"]");
> > -                System.out.println("wrote classifier iteration
> ["+counter+"]");
> > +                Logger.getLogger("s4-ft").debug("wrote classifier
> iteration [" + counter + "]");
> > +                System.out.println("wrote classifier iteration [" +
> counter + "]");
> >                 // check if we are allowed to continue
> >                 if (null == zk.exists("/continue_" + counter, null)) {
> >                     CountDownLatch latch = new CountDownLatch(1);
> > @@ -96,19 +95,19 @@ public class WordClassifierPE extends
> ProcessingElement implements Watcher {
> >     @Override
> >     public void process(WatchedEvent event) {
> >         // TODO Auto-generated method stub
> > -
> > +
> >     }
> >
> >     @Override
> >     protected void onCreate() {
> >         // TODO Auto-generated method stub
> > -
> > +
> >     }
> >
> >     @Override
> >     protected void onRemove() {
> >         // TODO Auto-generated method stub
> > -
> > +
> >     }
> >
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/resources/default.s4.properties
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/test/resources/default.s4.properties
> b/subprojects/s4-core/src/test/resources/default.s4.properties
> > index 62fc7d5..2235032 100644
> > --- a/subprojects/s4-core/src/test/resources/default.s4.properties
> > +++ b/subprojects/s4-core/src/test/resources/default.s4.properties
> > @@ -4,6 +4,13 @@ cluster.hosts = localhost
> >  cluster.ports = 5077
> >  cluster.lock_dir = {user.dir}/tmp
> >  cluster.name = s4-test-cluster
> > -cluster.zk_address = localhost:21810
> > +cluster.zk_address = localhost:2181
> >  cluster.zk_session_timeout = 10000
> >  cluster.zk_connection_timeout = 10000
> > +s4.logger_level = DEBUG
> > +comm.module = org.apache.s4.core.CustomModule
> > +appsDir=/tmp/deploy-test
> > +tcp.partition.queue_size=1000
> > +comm.timeout=100
> > +comm.retry_delay=100
> > +comm.retries=10
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> > index 9a5d11a..3a566f2 100644
> > ---
> a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> > +++
> b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
> > @@ -3,9 +3,9 @@ comm.queue_listener_size = 8000
> >  cluster.hosts = localhost
> >  cluster.ports = 5077
> >  cluster.name = s4-test-cluster
> > -cluster.zk_address = localhost:21810
> > +cluster.zk_address = localhost:2181
> >  cluster.zk_session_timeout = 10000
> >  cluster.zk_connection_timeout = 10000
> >  comm.module = org.apache.s4.core.adapter.AdapterModule
> > -s4.logger_level = DEBUG
> > +s4.logger_level = TRACE
> >  appsDir=/tmp/deploy-test
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/s4-tools.gradle
> > ----------------------------------------------------------------------
> > diff --git a/subprojects/s4-tools/s4-tools.gradle
> b/subprojects/s4-tools/s4-tools.gradle
> > new file mode 100644
> > index 0000000..68c728e
> > --- /dev/null
> > +++ b/subprojects/s4-tools/s4-tools.gradle
> > @@ -0,0 +1,48 @@
> > +/*
> > + * Copyright 2010 the original author or authors.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at
> > + *
> > + *      http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +description = 'The S4 core platform.'
> > +
> > +apply plugin: 'java'
> > +
> > +task "create-dirs" << {
> > +   sourceSets.all*.java.srcDirs*.each { it.mkdirs() }
> > +   sourceSets.all*.resources.srcDirs*.each { it.mkdirs() }
> > +}
> > +
> > +dependencies {
> > +    compile project(":s4-base")
> > +    compile project(":s4-comm")
> > +    compile project(":s4-core")
> > +    compile libraries.jcommander
> > +    compile libraries.zkclient
> > +    compile libraries.commons_io
> > +}
> > +
> > +apply plugin:'application'
> > +mainClassName = "org.apache.s4.tools.Tools"
> > +
> > +run {
> > +    // run doesn't yet directly accept command line parameters...
> > +    if ( project.hasProperty('args') ) {
> > +        args project.args.split('\\s+')
> > +        print args
> > +    }
> > + }
> > +
> > +test {
> > +    forkEvery=1
> > +}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
> b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
> > new file mode 100644
> > index 0000000..4fc09f0
> > --- /dev/null
> > +++
> b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
> > @@ -0,0 +1,65 @@
> > +package org.apache.s4.tools;
> > +
> > +import java.util.Arrays;
> > +
> > +import org.apache.log4j.BasicConfigurator;
> > +import org.apache.log4j.Level;
> > +import org.apache.s4.comm.tools.TaskSetup;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> > +
> > +import com.beust.jcommander.JCommander;
> > +import com.beust.jcommander.Parameter;
> > +import com.beust.jcommander.Parameters;
> > +
> > +public class DefineCluster {
> > +
> > +    static Logger logger = LoggerFactory.getLogger(DefineCluster.class);
> > +
> > +    public static void main(String[] args) {
> > +        // configure log4j for Zookeeper
> > +        BasicConfigurator.configure();
> > +
>  org.apache.log4j.Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
> > +
>  org.apache.log4j.Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
> > +
> > +        ZKServerArgs clusterArgs = new ZKServerArgs();
> > +        JCommander jc = new JCommander(clusterArgs);
> > +        try {
> > +            jc.parse(args);
> > +        } catch (Exception e) {
> > +            System.out.println(Arrays.toString(args));
> > +            e.printStackTrace();
> > +            jc.usage();
> > +            System.exit(-1);
> > +        }
> > +        try {
> > +
> > +            logger.info("preparing new cluster [{}] with [{}]
> node(s)", clusterArgs.clusterName, clusterArgs.nbTasks);
> > +
> > +            TaskSetup taskSetup = new
> TaskSetup(clusterArgs.zkConnectionString);
> > +            taskSetup.clean(clusterArgs.clusterName);
> > +            taskSetup.setup(clusterArgs.clusterName,
> clusterArgs.nbTasks, clusterArgs.firstListeningPort);
> > +            logger.info("New cluster configuration uploaded into
> zookeeper");
> > +        } catch (Exception e) {
> > +            logger.error("Cannot initialize zookeeper with specified
> configuration", e);
> > +        }
> > +
> > +    }
> > +
> > +    @Parameters(separators = "=", commandDescription = "Setup new S4
> logical cluster")
> > +    static class ZKServerArgs {
> > +
> > +        @Parameter(names = "-name", description = "S4 cluster name",
> required = true)
> > +        String clusterName = "s4-test-cluster";
> > +
> > +        @Parameter(names = "-nbTasks", description = "number of tasks
> for the cluster", required = true)
> > +        int nbTasks = 1;
> > +
> > +        @Parameter(names = "-zk", description = "Zookeeper connection
> string")
> > +        String zkConnectionString = "localhost:2181";
> > +
> > +        @Parameter(names = "-firstListeningPort", description =
> "Initial listening port for nodes in this cluster. First node listens on
> the specified port, other nodes listen on port initial + nodeIndex",
> required = true)
> > +        int firstListeningPort = -1;
> > +    }
> > +
> > +}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
> b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
> > new file mode 100644
> > index 0000000..1b2eb7d
> > --- /dev/null
> > +++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
> > @@ -0,0 +1,162 @@
> > +package org.apache.s4.tools;
> > +
> > +import java.io.BufferedReader;
> > +import java.io.File;
> > +import java.io.IOException;
> > +import java.io.InputStreamReader;
> > +import java.util.ArrayList;
> > +import java.util.Arrays;
> > +import java.util.List;
> > +
> > +import junit.framework.Assert;
> > +
> > +import org.I0Itec.zkclient.ZkClient;
> > +import org.apache.log4j.BasicConfigurator;
> > +import org.apache.log4j.Level;
> > +import org.apache.log4j.Logger;
> > +import org.apache.s4.comm.topology.ZNRecord;
> > +import org.apache.s4.comm.topology.ZNRecordSerializer;
> > +import org.apache.s4.deploy.DistributedDeploymentManager;
> > +import org.apache.zookeeper.CreateMode;
> > +import org.slf4j.LoggerFactory;
> > +
> > +import com.beust.jcommander.JCommander;
> > +import com.beust.jcommander.Parameter;
> > +import com.beust.jcommander.Parameters;
> > +import com.google.common.io.ByteStreams;
> > +import com.google.common.io.Files;
> > +
> > +public class Deploy {
> > +
> > +    private static File tmpAppsDir;
> > +    static org.slf4j.Logger logger =
> LoggerFactory.getLogger(Deploy.class);
> > +
> > +    /**
> > +     * @param args
> > +     */
> > +    public static void main(String[] args) {
> > +
> > +        DeployAppArgs appArgs = new DeployAppArgs();
> > +        JCommander jc = new JCommander(appArgs);
> > +        // configure log4j for Zookeeper
> > +        BasicConfigurator.configure();
> > +        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
> > +        Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
> > +
> > +        try {
> > +            jc.parse(args);
> > +        } catch (Exception e) {
> > +            e.printStackTrace();
> > +            jc.usage();
> > +            System.exit(-1);
> > +        }
> > +        try {
> > +            ZkClient zkClient = new
> ZkClient(appArgs.zkConnectionString, appArgs.timeout);
> > +            zkClient.setZkSerializer(new ZNRecordSerializer());
> > +
> > +            tmpAppsDir = Files.createTempDir();
> > +
> > +            File s4rToDeploy = File.createTempFile("testapp" +
> System.currentTimeMillis(), "s4r");
> > +
> > +            String generatedS4RPath = null;
> > +
> > +            ExecGradle.exec(appArgs.gradleExecPath,
> appArgs.gradleBuildFilePath, "installS4R",
> > +                    new String[] { "appsDir=" +
> tmpAppsDir.getAbsolutePath() });
> > +            generatedS4RPath = tmpAppsDir.getAbsolutePath() + "/" +
> appArgs.appName + ".s4r";
> > +
> > +
>  Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new
> File(generatedS4RPath)),
> > +                    Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
> > +
> > +            final String uri = s4rToDeploy.toURI().toString();
> > +            ZNRecord record = new
> ZNRecord(String.valueOf(System.currentTimeMillis()));
> > +            record.putSimpleField(DistributedDeploymentManager.S4R_URI,
> uri);
> > +            zkClient.create("/" + appArgs.clusterName + "/apps/" +
> appArgs.appName, record, CreateMode.PERSISTENT);
> > +            logger.info("uploaded application [{}] to cluster [{}],
> using zookeeper znode [{}]", new String[] {
> > +                    appArgs.appName, appArgs.clusterName, "/" +
> appArgs.clusterName + "/apps/" + appArgs.appName });
> > +
> > +        } catch (Exception e) {
> > +            LoggerFactory.getLogger(Deploy.class).error("Cannot deploy
> app", e);
> > +        }
> > +
> > +    }
> > +
> > +    @Parameters(separators = "=")
> > +    static class DeployAppArgs {
> > +
> > +        @Parameter(names = "-gradle", description = "path to
> gradle/gradlew executable", required = true)
> > +        String gradleExecPath;
> > +
> > +        @Parameter(names = "-buildFile", description = "path to gradle
> build file for the S4 application", required = true)
> > +        String gradleBuildFilePath;
> > +
> > +        @Parameter(names = "-appName", description = "name of S4
> application", required = true)
> > +        String appName;
> > +
> > +        @Parameter(names = "-cluster", description = "logical name of
> the S4 cluster", required = true)
> > +        String clusterName;
> > +
> > +        @Parameter(names = "-zk", description = "zookeeper connection
> string")
> > +        String zkConnectionString = "localhost:2181";
> > +
> > +        @Parameter(names = "-timeout", description = "connection
> timeout to Zookeeper, in ms")
> > +        int timeout = 10000;
> > +
> > +    }
> > +
> > +    static class ExecGradle {
> > +
> > +        public static void exec(String gradlewExecPath, String
> buildFilePath, String taskName, String[] params)
> > +                throws Exception {
> > +            // Thread.sleep(10000);
> > +            List<String> cmdList = new ArrayList<String>();
> > +            // cmdList.add("sleep");
> > +            // cmdList.add("2");
> > +            // cmdList.add(";");
> > +            cmdList.add(gradlewExecPath);
> > +            // cmdList.add("-c");
> > +            //
> cmdList.add(gradlewFile.getParentFile().getAbsolutePath() +
> "/settings.gradle");
> > +            cmdList.add("-b");
> > +            cmdList.add(buildFilePath);
> > +            cmdList.add(taskName);
> > +            cmdList.add("-stacktrace");
> > +            if (params.length > 0) {
> > +                for (int i = 0; i < params.length; i++) {
> > +                    cmdList.add("-P" + params[i]);
> > +                }
> > +            }
> > +            System.out.println(Arrays.toString(cmdList.toArray(new
> String[] {})).replace(",", ""));
> > +            ProcessBuilder pb = new ProcessBuilder(cmdList);
> > +
> > +            pb.directory(new File(buildFilePath).getParentFile());
> > +            pb.redirectErrorStream();
> > +
> > +            final Process process = pb.start();
> > +            new Thread(new Runnable() {
> > +                @Override
> > +                public void run() {
> > +                    BufferedReader br = new BufferedReader(new
> InputStreamReader(process.getInputStream()));
> > +                    String line;
> > +                    try {
> > +                        line = br.readLine();
> > +                        while (line != null) {
> > +                            System.out.println(line);
> > +                            line = br.readLine();
> > +                        }
> > +                    } catch (IOException e) {
> > +                        throw new RuntimeException(e);
> > +                    }
> > +                }
> > +            }).start();
> > +
> > +            process.waitFor();
> > +
> > +            // try {
> > +            // int exitValue = process.exitValue();
> > +            // Assert.fail("forked process failed to start correctly.
> Exit code is [" + exitValue + "]");
> > +            // } catch (IllegalThreadStateException ignored) {
> > +            // }
> > +
> > +        }
> > +    }
> > +
> > +}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
> b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
> > new file mode 100644
> > index 0000000..ae53cb6
> > --- /dev/null
> > +++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
> > @@ -0,0 +1,22 @@
> > +package org.apache.s4.tools;
> > +
> > +import java.lang.reflect.Method;
> > +import java.util.Arrays;
> > +
> > +public class Tools {
> > +
> > +    public static void main(String[] args) {
> > +        try {
> > +            Class<?> toolClass = Class.forName(args[0]);
> > +            Method main = toolClass.getMethod("main", String[].class);
> > +            if (args.length > 1) {
> > +                main.invoke(null, new Object[] {
> Arrays.copyOfRange(args, 1, args.length) });
> > +            } else {
> > +                main.invoke(null, new Object[] { new String[0] });
> > +            }
> > +        } catch (Exception e) {
> > +            // TODO Auto-generated catch block
> > +            e.printStackTrace();
> > +        }
> > +    }
> > +}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
> b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
> > new file mode 100644
> > index 0000000..cb4c85c
> > --- /dev/null
> > +++
> b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
> > @@ -0,0 +1,75 @@
> > +package org.apache.s4.tools;
> > +
> > +import java.io.File;
> > +import java.util.Arrays;
> > +
> > +import org.I0Itec.zkclient.IDefaultNameSpace;
> > +import org.I0Itec.zkclient.ZkClient;
> > +import org.I0Itec.zkclient.ZkServer;
> > +import org.apache.commons.io.FileUtils;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> > +
> > +import com.beust.jcommander.JCommander;
> > +import com.beust.jcommander.Parameter;
> > +import com.beust.jcommander.Parameters;
> > +
> > +public class ZKServer {
> > +
> > +    static Logger logger = LoggerFactory.getLogger(ZKServer.class);
> > +
> > +    public static void main(String[] args) {
> > +        ZKServerArgs zkArgs = new ZKServerArgs();
> > +        JCommander jc = new JCommander(zkArgs);
> > +        try {
> > +            jc.parse(args);
> > +        } catch (Exception e) {
> > +            System.out.println(Arrays.toString(args));
> > +            e.printStackTrace();
> > +            jc.usage();
> > +            System.exit(-1);
> > +        }
> > +        try {
> > +
> > +            logger.info("Starting zookeeper server on port [{}]",
> zkArgs.zkPort);
> > +
> > +            if (zkArgs.clean) {
> > +                logger.info("cleaning existing data in [{}] and [{}]",
> zkArgs.dataDir, zkArgs.logDir);
> > +                FileUtils.deleteDirectory(new File(zkArgs.dataDir));
> > +                FileUtils.deleteDirectory(new File(zkArgs.logDir));
> > +            }
> > +            IDefaultNameSpace defaultNameSpace = new
> IDefaultNameSpace() {
> > +
> > +                @Override
> > +                public void createDefaultNameSpace(ZkClient zkClient) {
> > +
> > +                }
> > +            };
> > +
> > +            ZkServer zkServer = new ZkServer(zkArgs.dataDir,
> zkArgs.logDir, defaultNameSpace);
> > +            zkServer.start();
> > +        } catch (Exception e) {
> > +            logger.error("Cannot initialize zookeeper with specified
> configuration", e);
> > +        }
> > +    }
> > +
> > +    @Parameters(separators = "=", commandDescription = "Start Zookeeper
> server")
> > +    static class ZKServerArgs {
> > +
> > +        @Parameter(names = "-port", description = "Zookeeper port")
> > +        String zkPort = "2181";
> > +
> > +        @Parameter(names = "-dataDir", description = "data directory",
> required = false)
> > +        String dataDir = new File(System.getProperty("java.io.tmpdir")
> + File.separator + "tmp" + File.separator
> > +                + "zookeeper" + File.separator +
> "data").getAbsolutePath();
> > +
> > +        @Parameter(names = "-clean", description = "clean zookeeper
> data (arity=0) (make sure you specify correct directories...)")
> > +        boolean clean = true;
> > +
> > +        @Parameter(names = "-logDir", description = "log directory")
> > +        String logDir = new File(System.getProperty("java.io.tmpdir") +
> File.separator + "tmp" + File.separator
> > +                + "zookeeper" + File.separator +
> "log").getAbsolutePath();
> > +
> > +    }
> > +
> > +}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> > ----------------------------------------------------------------------
> > diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> > index 316b876..cab4eab 100644
> > --- a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> > +++ b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
> > @@ -12,44 +12,27 @@ public class ClockApp extends App {
> >     private ClockPE clockPE;
> >
> >     @Override
> > -    protected void start() {
> > +    protected void onStart() {
> >         System.out.println("Starting CounterApp...");
> >         clockPE.getInstanceForKey("single");
> >     }
> >
> >     // generic array due to varargs generates a warning.
> >     @Override
> > -    protected void init() {
> > +    protected void onInit() {
> >         System.out.println("Initing CounterApp...");
> >
> >         clockPE = new ClockPE(this);
> >         clockPE.setTimerInterval(1, TimeUnit.SECONDS);
> >
> > -        eventSource = new EventSource(this, "I can give you the time!");
> > +        eventSource = new EventSource(this, "clockStream");
> >         clockPE.setStreams((Streamable) eventSource);
> >     }
> >
> >     @Override
> > -    protected void close() {
> > +    protected void onClose() {
> >         System.out.println("Closing CounterApp...");
> >         eventSource.close();
> >     }
> >
> > -    @Override
> > -    protected void onStart() {
> > -        // TODO Auto-generated method stub
> > -
> > -    }
> > -
> > -    @Override
> > -    protected void onInit() {
> > -        // TODO Auto-generated method stub
> > -
> > -    }
> > -
> > -    @Override
> > -    protected void onClose() {
> > -        // TODO Auto-generated method stub
> > -
> > -    }
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> > ----------------------------------------------------------------------
> > diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> > index 1c92c57..6652f59 100644
> > --- a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> > +++ b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
> > @@ -7,41 +7,23 @@ public class ShowTimeApp extends App {
> >     private ShowPE showPE;
> >
> >     @Override
> > -    protected void start() {
> > +    protected void onStart() {
> >         System.out.println("Starting ShowTimeApp...");
> >         showPE.getInstanceForKey("single");
> >     }
> >
> >     @Override
> > -    protected void init() {
> > +    protected void onInit() {
> >         System.out.println("Initing ShowTimeApp...");
> >
> >         showPE = new ShowPE(this);
> >
> >         /* This stream will receive events from another app. */
> > -        createStream("I need the time.", showPE);
> > -    }
> > -
> > -    @Override
> > -    protected void close() {
> > -        System.out.println("Closing ShowTimeApp...");
> > -    }
> > -
> > -    @Override
> > -    protected void onStart() {
> > -        // TODO Auto-generated method stub
> > -
> > -    }
> > -
> > -    @Override
> > -    protected void onInit() {
> > -        // TODO Auto-generated method stub
> > -
> > +        createStream("clockStream", showPE);
> >     }
> >
> >     @Override
> >     protected void onClose() {
> > -        // TODO Auto-generated method stub
> > -
> > +        System.out.println("Closing ShowTimeApp...");
> >     }
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> > index 809cebd..8cb5843 100644
> > ---
> a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> > +++
> b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
> > @@ -41,7 +41,7 @@ public class SimplePE extends ProcessingElement
> implements Watcher {
> >     protected void onCreate() {
> >         if (zk == null) {
> >             try {
> > -                zk = new ZooKeeper("localhost:" + 21810, 4000, this);
> > +                zk = new ZooKeeper("localhost:" + 2181, 4000, this);
> >             } catch (IOException e) {
> >                 throw new RuntimeException(e);
> >             }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> > index 25b98a6..bbfaf17 100644
> > ---
> a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> > +++
> b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
> > @@ -39,7 +39,7 @@ public class TestApp extends App {
> >             } catch (IOException e) {
> >                 throw new RuntimeException(e);
> >             }
> > -            zkClient = new ZkClient("localhost:" + 21810);
> > +            zkClient = new ZkClient("localhost:" + 2181);
> >             if (!zkClient.exists("/s4-test")) {
> >                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
> >             }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> > index 0657ee9..8eb345b 100644
> > ---
> a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> > +++
> b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
> > @@ -17,7 +17,7 @@ public class TestApp extends App {
> >     @Override
> >     protected void onInit() {
> >         try {
> > -            zkClient = new ZkClient("localhost:" + 21810);
> > +            zkClient = new ZkClient("localhost:" + 2181);
> >             if (!zkClient.exists("/s4-test")) {
> >                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
> >             }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/README.txt
> > ----------------------------------------------------------------------
> > diff --git a/test-apps/twitter-adapter/README.txt
> b/test-apps/twitter-adapter/README.txt
> > new file mode 100644
> > index 0000000..f9bda45
> > --- /dev/null
> > +++ b/test-apps/twitter-adapter/README.txt
> > @@ -0,0 +1 @@
> > +Please refer to README.txt in twitter-counter application
> > \ No newline at end of file
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/build.gradle
> > ----------------------------------------------------------------------
> > diff --git a/test-apps/twitter-adapter/build.gradle
> b/test-apps/twitter-adapter/build.gradle
> > index 669d62b..100837d 100644
> > --- a/test-apps/twitter-adapter/build.gradle
> > +++ b/test-apps/twitter-adapter/build.gradle
> > @@ -49,6 +49,9 @@ group = 'org.apache.s4'
> >
> >  apply plugin: 'java'
> >  apply plugin: 'eclipse'
> > +apply plugin:'application'
> > +
> > +mainClassName = "org.apache.s4.core.Main"
> >
> >  /* The app classname is set automatically from the source files. */
> >  def appClassname = ''
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> > index ee905d9..102ca10 100644
> > ---
> a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> > +++
> b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
> > @@ -1,6 +1,9 @@
> >  package org.apache.s4.example.twitter;
> >
> > +import java.io.File;
> > +import java.io.FileInputStream;
> >  import java.net.ServerSocket;
> > +import java.util.Properties;
> >  import java.util.concurrent.LinkedBlockingQueue;
> >
> >  import org.I0Itec.zkclient.ZkClient;
> > @@ -15,6 +18,7 @@ import twitter4j.StatusDeletionNotice;
> >  import twitter4j.StatusListener;
> >  import twitter4j.TwitterStream;
> >  import twitter4j.TwitterStreamFactory;
> > +import twitter4j.conf.ConfigurationBuilder;
> >
> >  public class TwitterInputAdapter extends Adapter {
> >
> > @@ -49,7 +53,20 @@ public class TwitterInputAdapter extends Adapter {
> >
> >     public void connectAndRead() throws Exception {
> >
> > -        TwitterStream twitterStream =
> TwitterStreamFactory.getSingleton();
> > +        ConfigurationBuilder cb = new ConfigurationBuilder();
> > +        Properties twitterProperties = new Properties();
> > +        File twitter4jPropsFile = new
> File(System.getProperty("user.home") + "/twitter4j.properties");
> > +        if (!twitter4jPropsFile.exists()) {
> > +            logger.error(
> > +                    "Cannot find twitter4j.properties file in this
> location :[{}]. Make sure it is available at this place and includes
> user/password credentials",
> > +                    twitter4jPropsFile.getAbsolutePath());
> > +            return;
> > +        }
> > +        twitterProperties.load(new FileInputStream(twitter4jPropsFile));
> > +
> > +
>  cb.setDebugEnabled(Boolean.valueOf(twitterProperties.getProperty("debug")))
> > +
>  .setUser(twitterProperties.getProperty("user")).setPassword(twitterProperties.getProperty("password"));
> > +        TwitterStream twitterStream = new
> TwitterStreamFactory(cb.build()).getInstance();
> >         StatusListener statusListener = new StatusListener() {
> >
> >             @Override
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/default.s4.properties
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-adapter/src/main/resources/default.s4.properties
> b/test-apps/twitter-adapter/src/main/resources/default.s4.properties
> > deleted file mode 100644
> > index cd36aaa..0000000
> > --- a/test-apps/twitter-adapter/src/main/resources/default.s4.properties
> > +++ /dev/null
> > @@ -1,18 +0,0 @@
> > -comm.queue_emmiter_size = 8000
> > -comm.queue_listener_size = 8000
> > -cluster.hosts = localhost
> > -cluster.ports = 5077
> > -cluster.name = s4-adapter-cluster
> > -cluster.zk_address = localhost:21810
> > -cluster.zk_session_timeout = 10000
> > -cluster.zk_connection_timeout = 10000
> > -comm.module = org.apache.s4.deploy.TestModule
> > -s4.logger_level = TRACE
> > -appsDir=/tmp/deploy-test
> > -tcp.partition.queue_size=1000
> > -comm.timeout=100
> > -comm.retry_delay=100
> > -comm.retries=10
> > -
> > -# specify the name of the remote cluster (there is currently only 1
> remote cluster max)
> > -cluster.remote.name=s4-test-cluster
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/s4.properties
> > ----------------------------------------------------------------------
> > diff --git a/test-apps/twitter-adapter/src/main/resources/s4.properties
> b/test-apps/twitter-adapter/src/main/resources/s4.properties
> > new file mode 100644
> > index 0000000..fdd1bf6
> > --- /dev/null
> > +++ b/test-apps/twitter-adapter/src/main/resources/s4.properties
> > @@ -0,0 +1,18 @@
> > +comm.queue_emmiter_size = 8000
> > +comm.queue_listener_size = 8000
> > +cluster.hosts = localhost
> > +cluster.ports = 5077
> > +cluster.name = s4-adapter-cluster
> > +cluster.zk_address = localhost:2181
> > +cluster.zk_session_timeout = 10000
> > +cluster.zk_connection_timeout = 10000
> > +comm.module = org.apache.s4.core.adapter.AdapterModule
> > +s4.logger_level = DEBUG
> > +appsDir=/tmp/deploy-test
> > +tcp.partition.queue_size=1000
> > +comm.timeout=100
> > +comm.retry_delay=100
> > +comm.retries=10
> > +
> > +# specify the name of the remote cluster (there is currently only 1
> remote cluster max)
> > +cluster.remote.name=s4-test-cluster
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
> b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
> > deleted file mode 100644
> > index 7d58c7d..0000000
> > --- a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
> > +++ /dev/null
> > @@ -1,5 +0,0 @@
> > -debug=true
> > -# you need to set those parameters with valid twitter account
> credentials
> > -twitter4j.user=????
> > -twitter4j.password=????
> > -
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/README.txt
> > ----------------------------------------------------------------------
> > diff --git a/test-apps/twitter-counter/README.txt
> b/test-apps/twitter-counter/README.txt
> > new file mode 100644
> > index 0000000..d46b491
> > --- /dev/null
> > +++ b/test-apps/twitter-counter/README.txt
> > @@ -0,0 +1,33 @@
> > +An application that displays the current top 10 topics, as gathered
> from the twitter sample stream.
> > +It was ported and adapted from S4 0.3
> > +
> > +Architecture:
> > +- twitter-adapter app in adapter node connects to the twitter stream,
> extracts the twitted text and passes that to the application cluster
> > +- twitter-counter app in the application cluster receives the text of
> the tweets, extracts the topics, counts topic occurences and periodically
> displays the top 10 topics on the console
> > +
> > +How to configure:
> > +- you need a twitter4j.properties file in your home dir, with the
> following properties filled:
> > +debug=true|false
> > +user=<a twitter user name>
> > +password=<the matching password>
> > +
> > +How to run:
> > +0/ make sure tools are compiled by running ./gradlew s4-tools:installApp
> > +
> > +1/ start zookeeper server
> > +./s4 zkServer
> > +
> > +2/ create adapter cluster configuration
> > +./s4 newCluster -name=s4-test-cluster -firstListeningPort=10000
> -nbTasks=1
> > +
> > +3/ create application cluster configuration
> > +./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=11000
> -nbTasks=<number of nodes>
> > +NOTE: - the name of the downstream cluster is currently hardcoded in
> <s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties Make
> sure you use the same name
> > +
> > +4/ start application nodes (as many as defined in the cluster
> configuration, more for failover capabilities)
> > +./s4 appNode
> <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
> > +
> > +5/ start adapter node
> > +./s4 adapterNode
> -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
> > +
> > +6/ observe the topic count output (on 1 of the application nodes)
> > \ No newline at end of file
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/build.gradle
> > ----------------------------------------------------------------------
> > diff --git a/test-apps/twitter-counter/build.gradle
> b/test-apps/twitter-counter/build.gradle
> > index e737b40..e10eae0 100644
> > --- a/test-apps/twitter-counter/build.gradle
> > +++ b/test-apps/twitter-counter/build.gradle
> > @@ -49,6 +49,7 @@ group = 'org.apache.s4'
> >
> >  apply plugin: 'java'
> >  apply plugin: 'eclipse'
> > +apply plugin:'application'
> >
> >  /* The app classname is set automatically from the source files. */
> >  def appClassname = ''
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> > index 3d9a9fb..6fd68b5 100644
> > ---
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> > +++
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
> > @@ -4,6 +4,7 @@ import java.util.Iterator;
> >  import java.util.Map;
> >  import java.util.TreeSet;
> >
> > +import org.apache.s4.base.Event;
> >  import org.apache.s4.core.App;
> >  import org.apache.s4.core.ProcessingElement;
> >  import org.slf4j.Logger;
> > @@ -14,16 +15,16 @@ import com.google.common.collect.Sets;
> >
> >  public class TopNTopicPE extends ProcessingElement {
> >
> > +    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
> > +    Map<String, Integer> countedTopics = Maps.newHashMap();
> > +
> >     public TopNTopicPE(App app) {
> >         super(app);
> > -        // TODO Auto-generated constructor stub
> > +        logger.info("key: [{}]", getId());
> >     }
> >
> > -    Map<String, Integer> countedTopics = Maps.newHashMap();
> > -    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
> > -
> > -    public void onEvent(TopicSeenEvent event) {
> > -        countedTopics.put(event.topic, event.count);
> > +    public void onEvent(Event event) {
> > +        countedTopics.put(event.get("topic"), event.get("count",
> Integer.class));
> >     }
> >
> >     public void onTime() {
> > @@ -32,6 +33,8 @@ public class TopNTopicPE extends ProcessingElement {
> >             sortedTopics.add(new TopNEntry(topicCount.getKey(),
> topicCount.getValue()));
> >         }
> >
> > +        logger.info("\n------------------");
> > +
> >         int i = 0;
> >         Iterator<TopNEntry> iterator = sortedTopics.iterator();
> >         long time = System.currentTimeMillis();
> > @@ -39,6 +42,7 @@ public class TopNTopicPE extends ProcessingElement {
> >             TopNEntry entry = iterator.next();
> >             logger.info("{} : topic [{}] count [{}]",
> >                     new String[] { String.valueOf(time), entry.topic,
> String.valueOf(entry.count) });
> > +            i++;
> >         }
> >     }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> > index a6d1478..5a41231 100644
> > ---
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> > +++
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
> > @@ -1,27 +1,36 @@
> >  package org.apache.s4.example.twitter;
> >
> > +import org.apache.s4.base.Event;
> >  import org.apache.s4.core.App;
> >  import org.apache.s4.core.ProcessingElement;
> >  import org.apache.s4.core.Stream;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> >
> >  // keyed by topic name
> >  public class TopicCountAndReportPE extends ProcessingElement {
> >
> > -    Stream<TopicSeenEvent> downStream;
> > +    Stream<Event> downStream;
> >     int threshold = 10;
> >     int count;
> > +    boolean firstEvent = true;
> > +
> > +    static Logger logger =
> LoggerFactory.getLogger(TopicCountAndReportPE.class);
> >
> >     public TopicCountAndReportPE(App app) {
> >         super(app);
> > -        // TODO Auto-generated constructor stub
> >     }
> >
> > -    public void setDownstream(Stream<TopicSeenEvent> stream) {
> > +    public void setDownstream(Stream<Event> stream) {
> >         this.downStream = stream;
> >     }
> >
> > -    public void onEvent(TopicSeenEvent event) {
> > -        count += event.count;
> > +    public void onEvent(Event event) {
> > +        if (firstEvent) {
> > +            logger.info("Handling new topic [{}]", getId());
> > +            firstEvent = false;
> > +        }
> > +        count += event.get("count", Integer.class);
> >     }
> >
> >     @Override
> > @@ -34,14 +43,15 @@ public class TopicCountAndReportPE extends
> ProcessingElement {
> >         if (count < threshold) {
> >             return;
> >         }
> > -        TopicSeenEvent topicSeenEvent = new TopicSeenEvent(getId(),
> count);
> > +        Event topicSeenEvent = new Event();
> > +        topicSeenEvent.put("topic", String.class, getId());
> > +        topicSeenEvent.put("count", Integer.class, count);
> > +        topicSeenEvent.put("aggregationKey", String.class,
> "aggregationValue");
> >         downStream.put(topicSeenEvent);
> >     }
> >
> >     @Override
> >     protected void onRemove() {
> > -        // TODO Auto-generated method stub
> > -
> >     }
> >
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> > index 501d6fd..9936ed0 100644
> > ---
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> > +++
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
> > @@ -6,13 +6,16 @@ import org.apache.s4.base.Event;
> >  import org.apache.s4.core.App;
> >  import org.apache.s4.core.ProcessingElement;
> >  import org.apache.s4.core.Streamable;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> >
> >  import com.google.common.base.Splitter;
> >
> >  public class TopicExtractorPE extends ProcessingElement {
> >
> >     static private ServerSocket serverSocket;
> > -    Streamable<TopicSeenEvent> downStream;
> > +    Streamable<Event> downStream;
> > +    static Logger logger =
> LoggerFactory.getLogger(TopicExtractorPE.class);
> >
> >     public TopicExtractorPE(App app) {
> >         super(app);
> > @@ -24,18 +27,22 @@ public class TopicExtractorPE extends
> ProcessingElement {
> >
> >     }
> >
> > -    public void setDownStream(Streamable<TopicSeenEvent> stream) {
> > +    public void setDownStream(Streamable<Event> stream) {
> >         this.downStream = stream;
> >     }
> >
> >     public void onEvent(Event event) {
> >         String text = event.get("statusText", String.class);
> > +        logger.trace("event text [{}]", text);
> >         if (text.contains("#")) {
> >             Iterable<String> split =
> Splitter.on("#").omitEmptyStrings().trimResults()
> >                     .split(text.substring(text.indexOf("#") + 1,
> text.length()));
> >             for (String topic : split) {
> >                 String topicOnly = topic.split(" ")[0];
> > -                downStream.put(new TopicSeenEvent(topicOnly, 1));
> > +                Event event2 = new Event();
> > +                event2.put("topic", String.class, topicOnly);
> > +                event2.put("count", Integer.class, 1);
> > +                downStream.put(event2);
> >             }
> >         }
> >     }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
> > deleted file mode 100644
> > index b28d61e..0000000
> > ---
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
> > +++ /dev/null
> > @@ -1,17 +0,0 @@
> > -package org.apache.s4.example.twitter;
> > -
> > -import org.apache.s4.base.Event;
> > -
> > -public class TopicSeenEvent extends Event {
> > -
> > -    public String topic;
> > -    public int count;
> > -    public String reportKey = "x";
> > -
> > -    public TopicSeenEvent(String topic, int count) {
> > -        super();
> > -        this.topic = topic;
> > -        this.count = count;
> > -    }
> > -
> > -}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> > index cf0fb40..f6edd8c 100644
> > ---
> a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> > +++
> b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
> > @@ -29,13 +29,13 @@ public class TwitterCounterApp extends App {
> >             TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
> >             topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
> >             @SuppressWarnings("unchecked")
> > -            Stream<TopicSeenEvent> aggregatedTopicStream =
> createStream("AggregatedTopicSeen", new KeyFinder() {
> > +            Stream<Event> aggregatedTopicStream =
> createStream("AggregatedTopicSeen", new KeyFinder() {
> >
> >                 @Override
> >                 public List<String> get(Event arg0) {
> >                     return new ArrayList<String>() {
> >                         {
> > -                            add("x");
> > +                            add("aggregationKey");
> >                         }
> >                     };
> >                 }
> > @@ -44,13 +44,13 @@ public class TwitterCounterApp extends App {
> >             TopicCountAndReportPE topicCountAndReportPE =
> createPE(TopicCountAndReportPE.class);
> >             topicCountAndReportPE.setDownstream(aggregatedTopicStream);
> >             topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
> > -            Stream<TopicSeenEvent> topicSeenStream =
> createStream("TopicSeen", new KeyFinder<TopicSeenEvent>() {
> > +            Stream<Event> topicSeenStream = createStream("TopicSeen",
> new KeyFinder<Event>() {
> >
> >                 @Override
> > -                public List<String> get(final TopicSeenEvent arg0) {
> > +                public List<String> get(final Event arg0) {
> >                     return new ArrayList<String>() {
> >                         {
> > -                            add(arg0.topic);
> > +                            add(arg0.get("topic"));
> >                         }
> >                     };
> >                 }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/resources/default.s4.properties
> > ----------------------------------------------------------------------
> > diff --git
> a/test-apps/twitter-counter/src/main/resources/default.s4.properties
> b/test-apps/twitter-counter/src/main/resources/default.s4.properties
> > new file mode 100644
> > index 0000000..d5da3f3
> > --- /dev/null
> > +++ b/test-apps/twitter-counter/src/main/resources/default.s4.properties
> > @@ -0,0 +1,15 @@
> > +comm.queue_emmiter_size = 8000
> > +comm.queue_listener_size = 8000
> > +cluster.hosts = localhost
> > +cluster.ports = 5077
> > +cluster.name = s4-adapter-cluster
> > +cluster.zk_address = localhost:21810
> > +cluster.zk_session_timeout = 10000
> > +cluster.zk_connection_timeout = 10000
> > +comm.module = org.apache.s4.core.CustomModule
> > +s4.logger_level = DEBUG
> > +appsDir=/tmp/deploy-test
> > +tcp.partition.queue_size=1000
> > +comm.timeout=100
> > +comm.retry_delay=100
> > +comm.retries=10
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message