eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [35/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/UnionUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/UnionUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/UnionUtils.scala
new file mode 100644
index 0000000..02e211b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/UnionUtils.scala
@@ -0,0 +1,41 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+object UnionUtils {
+  def join(producers : StreamProducer*) : StreamProducer = {
+    producers.head.streamUnion(producers.drop(1))
+  }
+
+  def join(producers : java.util.List[StreamProducer]) : StreamProducer = {
+    val newList = new util.ArrayList(producers)
+    val head = newList.get(0)
+    newList.remove(0)
+    head.streamUnion(newList.asScala);
+  }
+
+  def join(producers : List[StreamProducer]) : StreamProducer = {
+    val head = producers.head
+    head.streamUnion(producers.tail);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/JsonMessageDeserializer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/JsonMessageDeserializer.scala
new file mode 100644
index 0000000..754abe0
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/JsonMessageDeserializer.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.kafka
+
+import java.io.IOException
+import java.util
+import java.util.Properties
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * @since  11/6/15
+ */
+case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDeserializer{
+  private val objectMapper: ObjectMapper = new ObjectMapper
+  private val LOG: Logger = LoggerFactory.getLogger(classOf[JsonMessageDeserializer])
+
+  override def deserialize(bytes: Array[Byte]): AnyRef = {
+    var map: util.Map[String, _] = null
+    try {
+      map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
+    } catch {
+      case e: IOException => {
+        LOG.error("Failed to deserialize json from: " + new String(bytes), e)
+      }
+    }
+    map
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/KafkaStreamMonitor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/KafkaStreamMonitor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/KafkaStreamMonitor.scala
new file mode 100644
index 0000000..eff35fc
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/kafka/KafkaStreamMonitor.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.kafka
+
+import org.apache.eagle.datastream.StormStreamApp
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
+
+/**
+ * @since  11/6/15
+ */
+class KafkaStreamMonitorApp extends StormStreamApp{
+  val streamName = get[String]("eagle.stream.name","eventStream")
+  val streamExecutorId = get[String]("eagle.stream.executor",s"${streamName}Executor")
+
+  set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
+
+  source(new KafkaSourcedSpoutProvider).renameOutputFields(1).withName(streamName)
+    .alertWithConsumer(streamName, streamExecutorId)
+}
+
+object KafkaStreamMonitor extends KafkaStreamMonitorApp
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/dataproc/util/TestConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/dataproc/util/TestConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/dataproc/util/TestConfigOptionParser.java
deleted file mode 100644
index d23c090..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/dataproc/util/TestConfigOptionParser.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.util;
-
-import junit.framework.Assert;
-import org.apache.commons.cli.ParseException;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * @since 8/23/15
- */
-public class TestConfigOptionParser {
-    private final static Logger LOG = LoggerFactory.getLogger(TestConfigOptionParser.class);
-
-    @Test
-    public void testValidCommandArguments() throws ParseException {
-        String[] arguments = new String[]{
-                "-D","key1=value1",
-                "-D","key2=value2",
-                "-D","key3=value3=something",
-                "-D","key4=",
-                "-D","key5=\"--param having whitespace\""
-        };
-
-        Map<String,String> config = new ConfigOptionParser().parseConfig(arguments);
-
-        Assert.assertTrue(config.containsKey("key1"));
-        Assert.assertTrue(config.containsKey("key2"));
-        Assert.assertTrue(config.containsKey("key3"));
-        Assert.assertTrue(config.containsKey("key4"));
-        Assert.assertEquals("value1", config.get("key1"));
-        Assert.assertEquals("value2", config.get("key2"));
-        Assert.assertEquals("value3=something",config.get("key3"));
-        Assert.assertEquals("",config.get("key4"));
-        Assert.assertEquals("\"--param having whitespace",config.get("key5"));
-    }
-
-    @Test
-    public void testValidCommandArgumentsAsSystem() throws ParseException {
-        String[] arguments = new String[]{
-                "-D","key1=value1",
-                "-D","key2=value2",
-                "-D","key3=value3=something",
-                "-D","key4=",
-        };
-
-        new ConfigOptionParser().load(arguments);
-
-        Assert.assertTrue(System.getProperties().containsKey("key1"));
-        Assert.assertTrue(System.getProperties().containsKey("key2"));
-        Assert.assertTrue(System.getProperties().containsKey("key3"));
-        Assert.assertTrue(System.getProperties().containsKey("key4"));
-
-        Assert.assertEquals("value1", System.getProperty("key1"));
-        Assert.assertEquals("value2", System.getProperty("key2"));
-        Assert.assertEquals("value3=something",System.getProperty("key3"));
-        Assert.assertEquals("",System.getProperty("key4"));
-    }
-
-    @Test
-    public void testInvalidCommandArgument1()  {
-        String[] arguments = new String[]{
-                "-D","key1"
-        };
-
-        try {
-            new ConfigOptionParser().parseConfig(arguments);
-            Assert.fail("Should throw ParseException");
-        } catch (ParseException e) {
-            LOG.info("Expected exception: " +e.getMessage());
-        }
-    }
-
-    @Test
-    public void testInvalidCommandArgument2()  {
-        String[] arguments = new String[]{
-                "-D","=value"
-        };
-
-        try {
-            new ConfigOptionParser().parseConfig(arguments);
-            Assert.fail("Should throw ParseException");
-        } catch (ParseException e) {
-            LOG.info("Expected exception: " + e.getMessage());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/JavaEchoExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/JavaEchoExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/JavaEchoExecutor.java
deleted file mode 100644
index 9340d2e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/JavaEchoExecutor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream;
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JavaEchoExecutor extends JavaStormStreamExecutor1<String>{
-    private static Logger LOG = LoggerFactory.getLogger(JavaEchoExecutor.class);
-    private Config config;
-    @Override
-    public void prepareConfig(Config config){
-        this.config = config;
-    }
-
-    /**
-     * give business code a chance to do initialization for example daemon thread
-     * this method is executed in remote machine
-     */
-    @Override
-    public void init(){
-    }
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> collector){
-        collector.collect(new Tuple1(input.get(0)));
-        LOG.info("echo " + input);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/TestJavaMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/TestJavaMain.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/TestJavaMain.java
deleted file mode 100644
index 60bba21..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/TestJavaMain.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.junit.Test;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-public class TestJavaMain {
-    public static class SerializableFunction1<T1,R> extends AbstractFunction1<T1, R> implements Serializable {
-        @Override
-        public Object apply(Object v1) {
-            return null;
-        }
-    }
-
-    //@Test
-    public void testGeneral(){
-        Config config = ConfigFactory.load();
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        env.newSource(new TestKeyValueSpout()).renameOutputFields(2).groupBy(Arrays.asList(0)).flatMap(new GroupedEchoExecutor()).withParallelism(2);
-        env.execute();
-    }
-
-    //@Test
-    public void testMap(){
-        Config config = ConfigFactory.load();
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        SerializableFunction1 f1 = new SerializableFunction1<Object, Object>();
-        env.newSource(new TestKeyValueSpout()).renameOutputFields(2).
-                map1(f1);
-        env.execute();
-    }
-
-    @Test
-    public void test() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/TestKafkaStreamMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/TestKafkaStreamMonitor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/TestKafkaStreamMonitor.java
deleted file mode 100644
index 61c52d5..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/eagle/datastream/TestKafkaStreamMonitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream;
-
-import eagle.datastream.kafka.KafkaStreamMonitorApp;
-
-/**
- * @since 11/7/15
- */
-public class TestKafkaStreamMonitor {
-    public static void main(String[] args){
-        new KafkaStreamMonitorApp().main(args);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
new file mode 100644
index 0000000..efcb0e7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/dataproc/util/TestConfigOptionParser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.util;
+
+import junit.framework.Assert;
+import org.apache.commons.cli.ParseException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * @since 8/23/15
+ */
+public class TestConfigOptionParser {
+    private final static Logger LOG = LoggerFactory.getLogger(TestConfigOptionParser.class);
+
+    @Test
+    public void testValidCommandArguments() throws ParseException {
+        String[] arguments = new String[]{
+                "-D","key1=value1",
+                "-D","key2=value2",
+                "-D","key3=value3=something",
+                "-D","key4=",
+                "-D","key5=\"--param having whitespace\""
+        };
+
+        Map<String,String> config = new ConfigOptionParser().parseConfig(arguments);
+
+        Assert.assertTrue(config.containsKey("key1"));
+        Assert.assertTrue(config.containsKey("key2"));
+        Assert.assertTrue(config.containsKey("key3"));
+        Assert.assertTrue(config.containsKey("key4"));
+        Assert.assertEquals("value1", config.get("key1"));
+        Assert.assertEquals("value2", config.get("key2"));
+        Assert.assertEquals("value3=something",config.get("key3"));
+        Assert.assertEquals("",config.get("key4"));
+        Assert.assertEquals("\"--param having whitespace",config.get("key5"));
+    }
+
+    @Test
+    public void testValidCommandArgumentsAsSystem() throws ParseException {
+        String[] arguments = new String[]{
+                "-D","key1=value1",
+                "-D","key2=value2",
+                "-D","key3=value3=something",
+                "-D","key4=",
+        };
+
+        new ConfigOptionParser().load(arguments);
+
+        Assert.assertTrue(System.getProperties().containsKey("key1"));
+        Assert.assertTrue(System.getProperties().containsKey("key2"));
+        Assert.assertTrue(System.getProperties().containsKey("key3"));
+        Assert.assertTrue(System.getProperties().containsKey("key4"));
+
+        Assert.assertEquals("value1", System.getProperty("key1"));
+        Assert.assertEquals("value2", System.getProperty("key2"));
+        Assert.assertEquals("value3=something",System.getProperty("key3"));
+        Assert.assertEquals("",System.getProperty("key4"));
+    }
+
+    @Test
+    public void testInvalidCommandArgument1()  {
+        String[] arguments = new String[]{
+                "-D","key1"
+        };
+
+        try {
+            new ConfigOptionParser().parseConfig(arguments);
+            Assert.fail("Should throw ParseException");
+        } catch (ParseException e) {
+            LOG.info("Expected exception: " +e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidCommandArgument2()  {
+        String[] arguments = new String[]{
+                "-D","=value"
+        };
+
+        try {
+            new ConfigOptionParser().parseConfig(arguments);
+            Assert.fail("Should throw ParseException");
+        } catch (ParseException e) {
+            LOG.info("Expected exception: " + e.getMessage());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
new file mode 100644
index 0000000..7b21288
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JavaEchoExecutor extends JavaStormStreamExecutor1<String>{
+    private static Logger LOG = LoggerFactory.getLogger(JavaEchoExecutor.class);
+    private Config config;
+    @Override
+    public void prepareConfig(Config config){
+        this.config = config;
+    }
+
+    /**
+     * give business code a chance to do initialization for example daemon thread
+     * this method is executed in remote machine
+     */
+    @Override
+    public void init(){
+    }
+
+    @Override
+    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> collector){
+        collector.collect(new Tuple1(input.get(0)));
+        LOG.info("echo " + input);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
new file mode 100644
index 0000000..07c19de
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.junit.Test;
+import scala.runtime.AbstractFunction1;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class TestJavaMain {
+    public static class SerializableFunction1<T1,R> extends AbstractFunction1<T1, R> implements Serializable {
+        @Override
+        public Object apply(Object v1) {
+            return null;
+        }
+    }
+
+    //@Test
+    public void testGeneral(){
+        Config config = ConfigFactory.load();
+        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+        env.newSource(new TestKeyValueSpout()).renameOutputFields(2).groupBy(Arrays.asList(0)).flatMap(new GroupedEchoExecutor()).withParallelism(2);
+        env.execute();
+    }
+
+    //@Test
+    public void testMap(){
+        Config config = ConfigFactory.load();
+        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+        SerializableFunction1 f1 = new SerializableFunction1<Object, Object>();
+        env.newSource(new TestKeyValueSpout()).renameOutputFields(2).
+                map1(f1);
+        env.execute();
+    }
+
+    @Test
+    public void test() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
new file mode 100644
index 0000000..5651e74
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import org.apache.eagle.datastream.kafka.KafkaStreamMonitorApp;
+import org.apache.eagle.datastream.kafka.KafkaStreamMonitorApp;
+
+/**
+ * @since 11/7/15
+ */
+public class TestKafkaStreamMonitor {
+    public static void main(String[] args){
+        new KafkaStreamMonitorApp().main(args);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
index 62045f6..0262d62 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
@@ -21,7 +21,7 @@
 	"alertExecutorConfigs" : {
 		"eventStreamExecutor" : {
 			"parallelism" : 1,
-			"partitioner" : "eagle.alert.policy.DefaultPolicyPartitioner"
+			"partitioner" : "org.apache.eagle.alert.policy.DefaultPolicyPartitioner"
 			"needValidation" : "true"
 		}
 	},

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestDAGExpansion.scala
deleted file mode 100644
index 12c3254..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestDAGExpansion.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import java.util
-
-import com.typesafe.config.{ConfigFactory, Config}
-
-object testStreamUnionExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
-  tail1.streamUnion(List(tail2)).map1(a => "xyz")
-  //env.execute
-}
-
-object testStreamGroupbyExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a))
-  //env.execute
-}
-
-object testStreamUnionAndGroupbyExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1)
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0)
-  tail1.streamUnion(List(tail2)).map1(a => "xyz")
-  //env.execute
-}
-
-/**
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object testAlertExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.newSource(TestSpout()).withName("testSpout1")
-                  .flatMap(WordPrependForAlertExecutor("test")).withName("prepend")
-                  .alertWithConsumer("s1", "alert1")
-  //env.execute
-}
-
-/**
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s2","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object testAlertExpansionWithUnion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.newSource(TestSpout()).withName("testSpout1").flatMap(WordPrependForAlertExecutor("test")).withName("prepend") //.map2(a => ("key1",a))
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a))
-  tail1.streamUnion(List(tail2)).alert(util.Arrays.asList("s1","s2"), "alert1", true)
-  //env.execute
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestScala.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestScala.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestScala.scala
deleted file mode 100644
index 592b7e0..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestScala.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait A { val x: Int }
-case class B(val x: Int, y: Int) extends A
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStormNodes.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStormNodes.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStormNodes.scala
deleted file mode 100644
index 46e3532..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStormNodes.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import java.util
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-
-case class TestSpout() extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(TestSpout.getClass)
-  var _collector : SpoutOutputCollector = null
-  override def nextTuple : Unit = {
-    _collector.emit(util.Arrays.asList("abc"))
-    LOG.info("send spout data abc")
-    Thread.sleep(1000)
-  }
-  override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={
-    declarer.declare(new Fields("value"))
-  }
-  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={
-    _collector = collector
-  }
-}
-
-case class TestKeyValueSpout() extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(TestSpout.getClass)
-  var _collector : SpoutOutputCollector = null
-  var count : Int = 0
-  override def nextTuple : Unit = {
-    if(count%3 == 0) {
-      _collector.emit(util.Arrays.asList("abc", new Integer(1)))
-    }else{
-      _collector.emit(util.Arrays.asList("xyz", new Integer(1)))
-    }
-    count += 1;
-    LOG.info("send spout data abc/xyz")
-    Thread.sleep(1000)
-  }
-  override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={
-    declarer.declare(new Fields("word", "count"))
-  }
-  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={
-    _collector = collector
-  }
-}
-
-case class EchoExecutor() extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(EchoExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(input.head.asInstanceOf[String]))
-    LOG.info("echo " + input.head)
-  }
-}
-
-case class WordPrependExecutor(prefix : String) extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(prefix + "_" + input.head))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordPrependForAlertExecutor(prefix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", prefix + "_" + input.head)
-    outputCollector.collect(Tuple2("key1",value))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordPrependForAlertExecutor2(prefix : String) extends StormStreamExecutor1[util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", prefix + "_" + input.head)
-    outputCollector.collect(Tuple1(value))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordAppendExecutor(suffix : String) extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(input.head + "_" + suffix))
-    LOG.info("append " + input.head + "_" + suffix)
-  }
-}
-
-case class WordAppendForAlertExecutor(suffix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", input.head + "_" + suffix)
-    outputCollector.collect(Tuple2("key1", value))
-    LOG.info("append " + input.head + "_" + suffix)
-  }
-}
-
-case class PatternAlertExecutor(pattern : String) extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(PatternAlertExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    LOG.info("send out " + input.head)
-    if(input.head.asInstanceOf[String].matches(pattern)){
-      LOG.info("Alert hadppens for input " + input.head + " and for pattern " + pattern)
-    }
-  }
-}
-
-case class GroupedEchoExecutor() extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(GroupedEchoExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    LOG.info("get " + input(0))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStormRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStormRunner.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStormRunner.scala
deleted file mode 100644
index b4219fa..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStormRunner.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import java.util
-
-import com.typesafe.config.{ConfigFactory, Config}
-
-/**
- * explicit union
- * a.union(b,c).alert() means (a,b,c)'s output is united into alert()
- * before running this testing, we should define in eagle service one policy and one stream schema
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object UnionForAlert extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
-  tail1.streamUnion(List(tail2)).alert(util.Arrays.asList("s1","s2"), "alert1", false)
-  //env.execute
-}
-
-/**
- * test alert after flatMap
- */
-object TestAlertAfterFlatMap extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val tail1 = env.newSource(TestSpout())
-                  .flatMap(WordPrependForAlertExecutor("test"))
-                  .alert(util.Arrays.asList("s1"), "alert1", false)
-  //env.execute
-}
-
-/**
- * test alert after Map
- */
-object TestAlertAfterMap extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val tail1 = env.newSource(TestSpout())
-    .flatMap(WordPrependForAlertExecutor2("test"))
-    .map2(a => ("key", a))
-    .alert(util.Arrays.asList("s1"), "alert1", false)
-  //env.execute
-}
-
-object StormRunnerWithoutSplitOrJoin extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
-    .flatMap(PatternAlertExecutor("test.*"))
-  //env.execute
-}
-
-object StormRunnerWithSplit extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val toBeSplit = env.newSource(TestSpout()).flatMap(EchoExecutor())
-  toBeSplit.flatMap(WordPrependExecutor("test")).flatMap(PatternAlertExecutor("test.*"))
-  toBeSplit.flatMap(WordAppendExecutor("test"))
-  //env.execute
-}
-
-object StormRunnerWithUnion extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependExecutor("test"))
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendExecutor("test"))
-  tail1.streamUnion(List(tail2)).flatMap(PatternAlertExecutor(".*test.*"))
-  //env.execute
-}
-
-object StormRunnerWithFilter extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")).
-    filter(_=>false).
-    flatMap(PatternAlertExecutor("test.*"))
-  //env.execute
-}
-
-object StormRunnerWithJavaExecutor extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")).
-    filter(_=>false).
-    flatMap(PatternAlertExecutor("test.*"))
-  //env.execute
-}
-
-object StormRunnerWithKeyValueSpout extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).withParallelism(2)
-  //env.execute
-}
-
-object StormRunnerWithKeyValueSpoutRenameOutputFields extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestKeyValueSpout()).renameOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).withParallelism(2)
-  //env.execute
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStreamDAGBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStreamDAGBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStreamDAGBuilder.scala
deleted file mode 100644
index c83e8a3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/eagle/datastream/TestStreamDAGBuilder.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{FlatSpec, Matchers}
-
-class TestStreamDAGBuilder extends FlatSpec with Matchers{
-//  "a single source DAG with groupBy" should "be traversed without groupBy node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = env.newSource(null).flatMap(EchoExecutor()).groupBy(0).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG with groupBy from spout" should "be traversed without groupBy node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = env.newSource(null).groupBy(0).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG with groupBy from spout and then split" should "be traversed without groupBy node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val groupby = env.newSource(null).groupBy(0)
-//    groupby.flatMap(WordPrependExecutor("test"))
-//    groupby.flatMap(WordAppendExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG without stream join" should "be traversed sequentially like specified" in{
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = env.newSource(null).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source with split" should "has more than one tail producer" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val echo = env.newSource(null).flatMap(EchoExecutor())
-//    val tail1 = echo.flatMap(WordPrependExecutor("test"))
-//    val tail2 = echo.flatMap(WordAppendExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source with split and join" should "has join" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val echo = env.newSource(null).flatMap(EchoExecutor())
-//    val tail1 = echo.flatMap(WordPrependExecutor("test"))
-//    val tail2 = echo.flatMap(WordAppendExecutor("test")).filter(_=>true).streamUnion(List(tail1)).
-//      flatMap(PatternAlertExecutor("test*"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FilterProducer(fn) =>
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("test*"))
-//      case _ => assert(false)
-//    }
-//    assert(!iter.hasNext)
-//  }
-//
-//  "multiple sources with split and union" should "has union" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val source1 = env.newSource(TestSpout())
-//    val source2 = env.newSource(TestSpout())
-//    val source3 = env.newSource(TestSpout())
-//
-//    val tail1 = source1.flatMap(WordPrependExecutor("test"))
-//    val tail2 = source2.filter(_=>true)
-//    val tail3 = source3.flatMap(WordAppendExecutor("test")).streamUnion(List(tail1, tail2)).
-//      flatMap(PatternAlertExecutor("abc*"))
-//
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t.isInstanceOf[TestSpout])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t.isInstanceOf[String])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FilterProducer(fn) =>
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case SourceProducer(t) => assert(t.isInstanceOf[String])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("abc*"))
-//      case _ => assert(false)
-//    }
-//    assert(!iter.hasNext)
-//  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
new file mode 100644
index 0000000..cd587d1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
@@ -0,0 +1,80 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util
+
+import com.typesafe.config.{ConfigFactory, Config}
+
+object testStreamUnionExpansion extends App{
+  val config : Config = ConfigFactory.load;
+  val env = new StormExecutionEnvironment(config)
+  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
+  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
+  tail1.streamUnion(List(tail2)).map1(a => "xyz")
+  //env.execute
+}
+
+object testStreamGroupbyExpansion extends App{
+  val config : Config = ConfigFactory.load;
+  val env = new StormExecutionEnvironment(config)
+  env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a))
+  //env.execute
+}
+
+object testStreamUnionAndGroupbyExpansion extends App{
+  val config : Config = ConfigFactory.load;
+  val env = new StormExecutionEnvironment(config)
+  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1)
+  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0)
+  tail1.streamUnion(List(tail2)).map1(a => "xyz")
+  //env.execute
+}
+
+/**
+ * 1. stream schema
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
+ * 2. policy
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ */
+object testAlertExpansion extends App{
+  val config : Config = ConfigFactory.load;
+  val env = new StormExecutionEnvironment(config)
+  val tail1 = env.newSource(TestSpout()).withName("testSpout1")
+                  .flatMap(WordPrependForAlertExecutor("test")).withName("prepend")
+                  .alertWithConsumer("s1", "alert1")
+  //env.execute
+}
+
+/**
+ * 1. stream schema
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s2","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
+ * 2. policy
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ */
+object testAlertExpansionWithUnion extends App{
+  val config : Config = ConfigFactory.load;
+  val env = new StormExecutionEnvironment(config)
+  val tail1 = env.newSource(TestSpout()).withName("testSpout1").flatMap(WordPrependForAlertExecutor("test")).withName("prepend") //.map2(a => ("key1",a))
+  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a))
+  tail1.streamUnion(List(tail2)).alert(util.Arrays.asList("s1","s2"), "alert1", true)
+  //env.execute
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
new file mode 100644
index 0000000..d785689
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait A { val x: Int }
+case class B(val x: Int, y: Int) extends A
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
new file mode 100644
index 0000000..231ebab
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.spout.SpoutOutputCollector
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichSpout
+import backtype.storm.tuple.Fields
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+case class TestSpout() extends BaseRichSpout {
+  val LOG = LoggerFactory.getLogger(TestSpout.getClass)
+  var _collector : SpoutOutputCollector = null
+  override def nextTuple : Unit = {
+    _collector.emit(util.Arrays.asList("abc"))
+    LOG.info("send spout data abc")
+    Thread.sleep(1000)
+  }
+  override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={
+    declarer.declare(new Fields("value"))
+  }
+  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={
+    _collector = collector
+  }
+}
+
+case class TestKeyValueSpout() extends BaseRichSpout {
+  val LOG = LoggerFactory.getLogger(TestSpout.getClass)
+  var _collector : SpoutOutputCollector = null
+  var count : Int = 0
+  override def nextTuple : Unit = {
+    if(count%3 == 0) {
+      _collector.emit(util.Arrays.asList("abc", new Integer(1)))
+    }else{
+      _collector.emit(util.Arrays.asList("xyz", new Integer(1)))
+    }
+    count += 1;
+    LOG.info("send spout data abc/xyz")
+    Thread.sleep(1000)
+  }
+  override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={
+    declarer.declare(new Fields("word", "count"))
+  }
+  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={
+    _collector = collector
+  }
+}
+
+case class EchoExecutor() extends StormStreamExecutor1[String] {
+  val LOG = LoggerFactory.getLogger(EchoExecutor.getClass)
+  var config : Config = null
+  override def prepareConfig(config : Config){this.config = config}
+  override def init {}
+  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
+    outputCollector.collect(Tuple1(input.head.asInstanceOf[String]))
+    LOG.info("echo " + input.head)
+  }
+}
+
+case class WordPrependExecutor(prefix : String) extends StormStreamExecutor1[String] {
+  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
+  var config : Config = null
+  override def prepareConfig(config : Config){this.config = config}
+  override def init {}
+  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
+    outputCollector.collect(Tuple1(prefix + "_" + input.head))
+    LOG.info("preappend " + prefix + "_" + input.head)
+  }
+}
+
+case class WordPrependForAlertExecutor(prefix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] {
+  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
+  var config : Config = null
+  override def prepareConfig(config : Config){this.config = config}
+  override def init {}
+  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={
+    val value = new util.TreeMap[Object, Object]()
+    value.put("word", prefix + "_" + input.head)
+    outputCollector.collect(Tuple2("key1",value))
+    LOG.info("preappend " + prefix + "_" + input.head)
+  }
+}
+
+case class WordPrependForAlertExecutor2(prefix : String) extends StormStreamExecutor1[util.SortedMap[Object, Object]] {
+  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
+  var config : Config = null
+  override def prepareConfig(config : Config){this.config = config}
+  override def init {}
+  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[util.SortedMap[Object, Object]]]): Unit ={
+    val value = new util.TreeMap[Object, Object]()
+    value.put("word", prefix + "_" + input.head)
+    outputCollector.collect(Tuple1(value))
+    LOG.info("preappend " + prefix + "_" + input.head)
+  }
+}
+
+case class WordAppendExecutor(suffix : String) extends StormStreamExecutor1[String] {
+  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
+  var config : Config = null
+  override def prepareConfig(config : Config){this.config = config}
+  override def init {}
+  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
+    outputCollector.collect(Tuple1(input.head + "_" + suffix))
+    LOG.info("append " + input.head + "_" + suffix)
+  }
+}
+
+case class WordAppendForAlertExecutor(suffix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] {
+  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
+  var config : Config = null
+  override def prepareConfig(config : Config){this.config = config}
+  override def init {}
+  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={
+    val value = new util.TreeMap[Object, Object]()
+    value.put("word", input.head + "_" + suffix)
+    outputCollector.collect(Tuple2("key1", value))
+    LOG.info("append " + input.head + "_" + suffix)
+  }
+}
+
+case class PatternAlertExecutor(pattern : String) extends StormStreamExecutor1[String] {
+  val LOG = LoggerFactory.getLogger(PatternAlertExecutor.getClass)
+  var config : Config = null
+  override def prepareConfig(config : Config){this.config = config}
+  override def init {}
+  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
+    LOG.info("send out " + input.head)
+    if(input.head.asInstanceOf[String].matches(pattern)){
+      LOG.info("Alert hadppens for input " + input.head + " and for pattern " + pattern)
+    }
+  }
+}
+
+case class GroupedEchoExecutor() extends StormStreamExecutor1[String] {
+  val LOG = LoggerFactory.getLogger(GroupedEchoExecutor.getClass)
+  var config : Config = null
+  override def prepareConfig(config : Config){this.config = config}
+  override def init {}
+  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
+    LOG.info("get " + input(0))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
new file mode 100644
index 0000000..c550182
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import com.typesafe.config.{ConfigFactory, Config}
+
+/**
+ * explicit union
+ * a.union(b,c).alert() means (a,b,c)'s output is united into alert()
+ * before running this testing, we should define in eagle service one policy and one stream schema
+ * 1. stream schema
+ * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
+ * 2. policy
+ * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ */
+object UnionForAlert extends App{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
+  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
+  tail1.streamUnion(List(tail2)).alert(util.Arrays.asList("s1","s2"), "alert1", false)
+  //env.execute
+}
+
+/**
+ * test alert after flatMap
+ */
+object TestAlertAfterFlatMap extends App{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  val tail1 = env.newSource(TestSpout())
+                  .flatMap(WordPrependForAlertExecutor("test"))
+                  .alert(util.Arrays.asList("s1"), "alert1", false)
+  //env.execute
+}
+
+/**
+ * test alert after Map
+ */
+object TestAlertAfterMap extends App{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  val tail1 = env.newSource(TestSpout())
+    .flatMap(WordPrependForAlertExecutor2("test"))
+    .map2(a => ("key", a))
+    .alert(util.Arrays.asList("s1"), "alert1", false)
+  //env.execute
+}
+
+object StormRunnerWithoutSplitOrJoin extends Application{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  env.newSource(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
+    .flatMap(PatternAlertExecutor("test.*"))
+  //env.execute
+}
+
+object StormRunnerWithSplit extends Application{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  val toBeSplit = env.newSource(TestSpout()).flatMap(EchoExecutor())
+  toBeSplit.flatMap(WordPrependExecutor("test")).flatMap(PatternAlertExecutor("test.*"))
+  toBeSplit.flatMap(WordAppendExecutor("test"))
+  //env.execute
+}
+
+object StormRunnerWithUnion extends Application{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependExecutor("test"))
+  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendExecutor("test"))
+  tail1.streamUnion(List(tail2)).flatMap(PatternAlertExecutor(".*test.*"))
+  //env.execute
+}
+
+object StormRunnerWithFilter extends Application{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  env.newSource(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")).
+    filter(_=>false).
+    flatMap(PatternAlertExecutor("test.*"))
+  //env.execute
+}
+
+object StormRunnerWithJavaExecutor extends Application{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  env.newSource(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")).
+    filter(_=>false).
+    flatMap(PatternAlertExecutor("test.*"))
+  //env.execute
+}
+
+object StormRunnerWithKeyValueSpout extends Application{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  env.newSource(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).withParallelism(2)
+  //env.execute
+}
+
+object StormRunnerWithKeyValueSpoutRenameOutputFields extends Application{
+  val config : Config = ConfigFactory.load;
+  val env = ExecutionEnvironmentFactory.getStorm(config)
+  env.newSource(TestKeyValueSpout()).renameOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).withParallelism(2)
+  //env.execute
+}
\ No newline at end of file


Mime
View raw message