eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [34/51] [partial] incubator-eagle git commit: EAGLE-184 Migrate eagle website from https://github.com/eaglemonitoring/eaglemonitoring.github.io to document branch
Date Thu, 03 Mar 2016 18:10:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
deleted file mode 100644
index 0d0638d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.apache.eagle.dataproc.impl.aggregate.SimpleAggregateExecutor;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.junit.Test;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Created on 1/20/16.
- */
-public class TestSimpleAggregateExecutor {
-
-    @Test
-    public void test() throws Exception {
-        SimpleAggregateExecutor sae = new SimpleAggregateExecutor(new String[]{"s1"},
-                "define stream s1(eagleAlertContext object, timestamp long, metric string);" +
-                        " @info(name='query')" +
-                        " from s1 select * insert into tmp;"
-                ,
-                "siddhiCEPEngine",
-                0,
-                1);
-
-        Config config = ConfigFactory.empty();
-        sae.prepareConfig(config);
-        sae.init();
-
-        List<Object> tuple = new ArrayList<>(3);
-        tuple.add(0, "groupbykey");
-        tuple.add(1, "s1");
-        SortedMap value = new TreeMap();
-        value.put("timestamp", System.currentTimeMillis());
-        value.put("metric", "name-of-the-metric");
-        tuple.add(2, value);
-
-        final AtomicInteger count = new AtomicInteger();
-        sae.flatMap(tuple, new Collector<Tuple2<String, AggregateEntity>>(){
-            @Override
-            public void collect(Tuple2<String, AggregateEntity> stringAggregateEntityTuple2) {
-                System.out.print(stringAggregateEntityTuple2._1());
-                count.incrementAndGet();
-            }
-        });
-
-        Thread.sleep(3000);
-        Assert.assertEquals(1, count.get());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
deleted file mode 100644
index ed5d705..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.apache.eagle.datastream.core.*;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.partition.PartitionStrategy;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
-import org.junit.Before;
-import org.junit.Test;
-import scala.collection.Seq;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * @since Dec 18, 2015
- *
- */
-public class TestStreamAggregate {
-
-	private Config config;
-
-	@SuppressWarnings("serial")
-	private final class SimpleSpout extends BaseRichSpout {
-		@SuppressWarnings("rawtypes")
-		@Override
-		public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-		}
-		@Override
-		public void nextTuple() {
-		}
-		@Override
-		public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		}
-	}
-
-	public static class TestEnvironment extends StormExecutionEnvironment {
-		private static final long serialVersionUID = 1L;
-		public TestEnvironment(Config conf) {
-			super(conf);
-		}
-		@Override
-		public void execute(StreamDAG dag) {
-			System.out.println("DAT completed!");
-		}
-	}
-	
-	public static class DummyStrategy implements PartitionStrategy {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public int balance(String key, int buckNum) {
-			return 0;
-		}
-	};
-	
-	public static class DummyExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity>  {
-		@Override
-		public void prepareConfig(Config config) {
-		}
-		@Override
-		public void init() {
-		}
-		@Override
-		public void flatMap(List input, Collector collector) {
-		}
-	}
-	
-	@Before
-	public void setUp() {
-		System.setProperty("config.resource", "/application.conf");
-		ConfigFactory.invalidateCaches();
-		config = ConfigFactory.load();
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes", "serial" })
-	@Test
-	public void testAggregate1() {
-		StormExecutionEnvironment exe = new TestEnvironment(config);
-		
-		BaseRichSpout spout = new SimpleSpout();
-		StormSourceProducer ssp = exe.fromSpout(spout);
-		
-		ssp.flatMap(new FlatMapper<String>() {
-			@Override
-			public void flatMap(Seq<Object> input, Collector<String> collector) {
-				// do nothing
-			}
-		}).aggregate(Arrays.asList("c3EsLogEventStream"), "qid", new DummyStrategy());
-		
-		try {
-			exe.execute();
-			Assert.fail("customzied flat mapper(non java storm executor) before analyze is not supported!");
-		} catch (Exception e ){
-		}
-	}
-	
-	@SuppressWarnings({ "unchecked", "rawtypes", "serial" })
-	@Test
-	public void testAggregate() {
-		StormExecutionEnvironment exe = new TestEnvironment(config);
-		StormSourceProducer ssp = exe.fromSpout(new SimpleSpout());
-		DummyExecutor dummy = new DummyExecutor();
-		ssp.flatMap(dummy).aggregate(Arrays.asList("c3EsLogEventStream"), "analyzeStreamExecutor", new DummyStrategy());
-
-		try {
-			exe.execute();
-		} catch (Exception e) {
-			Assert.fail("customized flat mapper before");
-		}
-		// Assertion
-		DirectedAcyclicGraph<StreamProducer<Object>, StreamConnector<Object, Object>> dag = exe.dag();
-		Assert.assertEquals("three vertex", 3, dag.vertexSet().size());
-		boolean hasWrapped = false;
-		for (StreamProducer<Object> obj : dag.vertexSet()) {
-			if (obj instanceof FlatMapProducer) {
-				if (((FlatMapProducer) obj).mapper() instanceof JavaStormExecutorForAlertWrapper) {
-					hasWrapped = true;
-					Assert.assertEquals("dummy executor should be wrapped in the alert wrapper func", dummy,
-							((JavaStormExecutorForAlertWrapper) ((FlatMapProducer) obj).mapper() ).getDelegate());
-
-				}
-			}
-		}
-		Assert.assertTrue(hasWrapped);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
deleted file mode 100644
index 87b1947..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
+++ /dev/null
@@ -1,79 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	"envContextConfig" : {
-		"env" : "storm",
-		"mode" : "local",
-		"topologyName" : "kafka-monitor-topology",
-		"parallelismConfig" : {
-			"kafkaMsgConsumer" : 1
-		}
-	},
-	"dataSourceConfig": {
-		"topic" : "nn_jmx_metric_sandbox",
-		"zkConnection" : "localhost:2181",
-		"zkConnectionTimeoutMS" : 15000,
-		"consumerGroupId" : "EagleConsumer",
-		"fetchSize" : 1048586,
-		"transactionZKServers" : "localhost",
-		"transactionZKPort" : 2181,
-		"transactionZKRoot" : "/consumers",
-		"transactionStateUpdateMS" : 2000
-	},
-	"alertExecutorConfigs" : {
-		"eventStreamExecutor" : {
-			"parallelism" : 1,
-			"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-			"needValidation" : "true"
-		}
-	},
-	"persistExecutorConfigs" {
-		"persistExecutor1" : {
-			"kafka": {
-				"bootstrap_servers" : "localhost",
-				"topics" : {
-					"defaultOutput" : "downSampleOutput"
-				}
-			}
-		}
-	},
-	"aggregateExecutorConfigs" : {
-		"aggregateStreamExecutor" : {
-			"parallelism" : 1,
-			"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-			"needValidation" : "true"
-		}
-	},
-	"eagleProps" : {
-		"site" : "sandbox",
-		"application": "eventSource",
-		"dataJoinPollIntervalSec" : 30,
-		"mailHost" : "mail.host.com",
-		"mailSmtpPort":"25",
-		"mailDebug" : "true",
-		"eagleService": {
-			"host": "localhost",
-			"port": 38080,
-			"username": "admin",
-			"password": "secret"
-		}
-	},
-	"dynamicConfigSource" : {
-		"enabled" : true,
-		"initDelayMillis" : 0,
-		"delayMillis" : 30000
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh
deleted file mode 100644
index 3e57b6c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-export EAGLE_SERVICE_USER="admin"
-export EAGLE_SERVICE_PASSWD="secret"
-export EAGLE_SERVICE_HOST="localhost"
-export EAGLE_SERVICE_PORT=38080
-
-# AlertDataSource: data sources bound to sites
-echo "Importing AlertDataSourceService for stream... "
-
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"eventSource"}, "enabled": "true", "config" : ""}]'
-
-## AlertStreamService: alert streams generated from data source
-echo ""
-echo "Importing AlertStreamService for stream... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" -d '[{"prefix":"alertStream","tags":{"dataSource":"eventSource","streamName":"eventStream"},"desc":"alert event stream from hdfs audit log"}]'
-
-## AlertExecutorService: what alert streams are consumed by alert executor
-echo ""
-echo "Importing AlertExecutorService for stream ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"eventSource","alertExecutorId":"eventStreamExecutor","streamName":"eventStream"},"desc":"alert executor for event stream"}]'
-
-## AlertStreamSchemaService: schema for event from alert stream
-echo ""
-echo "Importing AlertStreamSchemaService for stream ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"timestamp"},"attrDescription":"event timestamp","attrType":"long","category":"","attrValueResolver":""},{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"name"},"attrDescription":"event name","attrType":"string","category":"","attrValueResolver":""},{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"value"},"attrDescription":"event value","attrType":"integer","category":"","attrValueResolver":""}]'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties
deleted file mode 100644
index 3499c46..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
- eagle.log.dir=./logs
- eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
- log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
- log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
- log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-## 30-day backup
-# log4j.appender.DRFA.MaxBackupIndex=30
- log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
deleted file mode 100644
index 9d378c9..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package org.apache.eagle.datastream
-
-import org.apache.eagle.datastream.storm.StormWrapperUtils
-import org.scalatest.{FlatSpec, Matchers}
-
-class StormWrapperUtilsSpec extends FlatSpec with Matchers{
-  import StormWrapperUtils._
-  "StormWrapperUtils" should "convert Tuple{1,2,3,..} to java.util.List" in {
-    val list1 = productAsJavaList(new Tuple1("a"))
-    list1.size() should be(1)
-    list1.get(0) should be("a")
-
-    val list2 = productAsJavaList(new Tuple2("a","b"))
-    list2.size() should be(2)
-    list2.get(0) should be("a")
-    list2.get(1) should be("b")
-
-    val list3 = productAsJavaList(new Tuple3("a","b","c"))
-    list3.size() should be(3)
-    list3.get(0) should be("a")
-    list3.get(1) should be("b")
-    list3.get(2) should be("c")
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
deleted file mode 100644
index 4339e5a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-object testStreamUnionExpansion extends App{
-  val config : Config = ConfigFactory.load
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
-  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
-  tail1.streamUnion(List(tail2)).map1(a => "xyz")
-}
-
-object testStreamGroupbyExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a))
-  //env.execute
-}
-
-object testStreamUnionAndGroupbyExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1)
-  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0)
-  tail1.streamUnion(List(tail2)).map1(a => "xyz")
-//  env.execute()
-}
-
-/**
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object testAlertExpansion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1")
-                  .flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend")
-                  .alertWithConsumer("s1", "alert1")
-  //env.execute
-}
-
-/**
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s2","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object testAlertExpansionWithUnion extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1").flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend") //.map2(a => ("key1",a))
-  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a))
-  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = true)
-  //env.execute
-}
-
-
-object testStreamUnionExpansionWithSharedSpout extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val source = env.fromSpout(TestSpout())
-  val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
-  val tail2 = source.flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
-  tail1.streamUnion(List(tail2)).map1(a  => {
-    println(a)
-    "xyz"
-  })
-//    env.execute
-}
-
-object testStreamUnionExpansionWithSharedSpout_2 extends App{
-  val config : Config = ConfigFactory.load;
-  val env = new StormExecutionEnvironment(config)
-  val source = env.fromSpout(TestSpout())
-  val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
-  source.streamUnion(List(tail1)).map1(a  => {
-    println(a)
-    "xyz"
-  })
-//  env.execute
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
deleted file mode 100644
index ccd3deb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-/**
- * @since  12/5/15
- */
-object TestExecutionEnvironment extends App{
-  val env0 = ExecutionEnvironments.get[StormExecutionEnvironment]
-  println(env0)
-  val config = ConfigFactory.load()
-  val env1 = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](config)
-  println(env1)
-  val env2 = ExecutionEnvironments.get[StormExecutionEnvironment](Array[String]("-D","key=value"))
-  println(env2)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
deleted file mode 100644
index d785689..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-trait A { val x: Int }
-case class B(val x: Int, y: Int) extends A
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
deleted file mode 100644
index 231ebab..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-
-case class TestSpout() extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(TestSpout.getClass)
-  var _collector : SpoutOutputCollector = null
-  override def nextTuple : Unit = {
-    _collector.emit(util.Arrays.asList("abc"))
-    LOG.info("send spout data abc")
-    Thread.sleep(1000)
-  }
-  override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={
-    declarer.declare(new Fields("value"))
-  }
-  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={
-    _collector = collector
-  }
-}
-
-case class TestKeyValueSpout() extends BaseRichSpout {
-  val LOG = LoggerFactory.getLogger(TestSpout.getClass)
-  var _collector : SpoutOutputCollector = null
-  var count : Int = 0
-  override def nextTuple : Unit = {
-    if(count%3 == 0) {
-      _collector.emit(util.Arrays.asList("abc", new Integer(1)))
-    }else{
-      _collector.emit(util.Arrays.asList("xyz", new Integer(1)))
-    }
-    count += 1;
-    LOG.info("send spout data abc/xyz")
-    Thread.sleep(1000)
-  }
-  override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={
-    declarer.declare(new Fields("word", "count"))
-  }
-  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={
-    _collector = collector
-  }
-}
-
-case class EchoExecutor() extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(EchoExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(input.head.asInstanceOf[String]))
-    LOG.info("echo " + input.head)
-  }
-}
-
-case class WordPrependExecutor(prefix : String) extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(prefix + "_" + input.head))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordPrependForAlertExecutor(prefix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", prefix + "_" + input.head)
-    outputCollector.collect(Tuple2("key1",value))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordPrependForAlertExecutor2(prefix : String) extends StormStreamExecutor1[util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", prefix + "_" + input.head)
-    outputCollector.collect(Tuple1(value))
-    LOG.info("preappend " + prefix + "_" + input.head)
-  }
-}
-
-case class WordAppendExecutor(suffix : String) extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    outputCollector.collect(Tuple1(input.head + "_" + suffix))
-    LOG.info("append " + input.head + "_" + suffix)
-  }
-}
-
-case class WordAppendForAlertExecutor(suffix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] {
-  val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={
-    val value = new util.TreeMap[Object, Object]()
-    value.put("word", input.head + "_" + suffix)
-    outputCollector.collect(Tuple2("key1", value))
-    LOG.info("append " + input.head + "_" + suffix)
-  }
-}
-
-case class PatternAlertExecutor(pattern : String) extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(PatternAlertExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    LOG.info("send out " + input.head)
-    if(input.head.asInstanceOf[String].matches(pattern)){
-      LOG.info("Alert hadppens for input " + input.head + " and for pattern " + pattern)
-    }
-  }
-}
-
-case class GroupedEchoExecutor() extends StormStreamExecutor1[String] {
-  val LOG = LoggerFactory.getLogger(GroupedEchoExecutor.getClass)
-  var config : Config = null
-  override def prepareConfig(config : Config){this.config = config}
-  override def init {}
-  override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={
-    LOG.info("get " + input(0))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
deleted file mode 100644
index c071eb2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-/**
- * explicit union
- * a.union(b,c).alert() means (a,b,c)'s output is united into alert()
- * before running this testing, we should define in eagle service one policy and one stream schema
- * 1. stream schema
- * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
- * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
- */
-object UnionForAlert extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
-  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
-  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = false)
-  env.execute()
-}
-
-/**
- * test alert after flatMap
- */
-object TestAlertAfterFlatMap extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val tail1 = env.fromSpout(TestSpout())
-                  .flatMap(WordPrependForAlertExecutor("test"))
-                  .alert(Seq("s1"), "alert1", consume = false)
-  //env.execute
-}
-
-/**
- * test alert after Map
- */
-object TestAlertAfterMap extends App{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val tail1 = env.fromSpout(TestSpout())
-    .flatMap(WordPrependForAlertExecutor2("test"))
-    .map2(a => ("key", a))
-    .alert(Seq("s1"), "alert1", false)
-  //env.execute
-}
-
-object StormRunnerWithoutSplitOrJoin extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
-    .flatMap(PatternAlertExecutor("test.*"))
-//  env.execute()
-}
-
-object StormRunnerWithSplit extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val toBeSplit = env.fromSpout(TestSpout()).flatMap(EchoExecutor())
-  toBeSplit.flatMap(WordPrependExecutor("test")).flatMap(PatternAlertExecutor("test.*"))
-  toBeSplit.flatMap(WordAppendExecutor("test"))
-//  env.execute()
-}
-
-object StormRunnerWithUnion extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependExecutor("test"))
-  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendExecutor("test"))
-  tail1.streamUnion(List(tail2)).flatMap(PatternAlertExecutor(".*test.*"))
-  env.execute()
-}
-
-object StormRunnerWithFilter extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")).
-    filter(_=>false).
-    flatMap(PatternAlertExecutor("test.*"))
-  //env.execute
-}
-
-object StormRunnerWithJavaExecutor extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  env.fromSpout(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")).
-    filter(_=>false).
-    flatMap(PatternAlertExecutor("test.*"))
-  //env.execute
-}
-
-object StormRunnerWithKeyValueSpout extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  env.fromSpout(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).parallelism(2)
-  //env.execute
-}
-
-object StormRunnerWithKeyValueSpoutRenameOutputFields extends Application{
-  val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironments.getStorm(config)
-  env.fromSpout(TestKeyValueSpout()).withOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).parallelism(2)
-  //env.execute
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
deleted file mode 100644
index 48dc7e5..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{FlatSpec, Matchers}
-
-class TestStreamDAGBuilder extends FlatSpec with Matchers{
-//  "a single source DAG with groupBy" should "be traversed without groupBy node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = env.newSource(null).flatMap(EchoExecutor()).groupBy(0).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG with groupBy from spout" should "be traversed without groupBy node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = env.newSource(null).groupBy(0).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG with groupBy from spout and then split" should "be traversed without groupBy node" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val groupby = env.newSource(null).groupBy(0)
-//    groupby.flatMap(WordPrependExecutor("test"))
-//    groupby.flatMap(WordAppendExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source DAG without stream join" should "be traversed sequentially like specified" in{
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val tail = env.newSource(null).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match{
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source with split" should "has more than one tail producer" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val echo = env.newSource(null).flatMap(EchoExecutor())
-//    val tail1 = echo.flatMap(WordPrependExecutor("test"))
-//    val tail2 = echo.flatMap(WordAppendExecutor("test"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//  }
-//
-//  "a single source with split and join" should "has join" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val echo = env.newSource(null).flatMap(EchoExecutor())
-//    val tail1 = echo.flatMap(WordPrependExecutor("test"))
-//    val tail2 = echo.flatMap(WordAppendExecutor("test")).filter(_=>true).streamUnion(List(tail1)).
-//      flatMap(PatternAlertExecutor("test*"))
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t == null)
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FilterProducer(fn) =>
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("test*"))
-//      case _ => assert(false)
-//    }
-//    assert(!iter.hasNext)
-//  }
-//
-//  "multiple sources with split and union" should "has union" in {
-//    val config = ConfigFactory.load()
-//    val env = ExecutionEnvironmentFactory.getStorm(config)
-//    val source1 = env.newSource(TestSpout())
-//    val source2 = env.newSource(TestSpout())
-//    val source3 = env.newSource(TestSpout())
-//
-//    val tail1 = source1.flatMap(WordPrependExecutor("test"))
-//    val tail2 = source2.filter(_=>true)
-//    val tail3 = source3.flatMap(WordAppendExecutor("test")).streamUnion(List(tail1, tail2)).
-//      flatMap(PatternAlertExecutor("abc*"))
-//
-//    val dag = new StreamDAGBuilder(env.heads).build()
-//    val iter = dag.iterator()
-//
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t.isInstanceOf[TestSpout])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case StormSourceProducer(t) => assert(t.isInstanceOf[String])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FilterProducer(fn) =>
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case SourceProducer(t) => assert(t.isInstanceOf[String])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
-//      case _ => assert(false)
-//    }
-//    assert(iter.hasNext)
-//    iter.next() match {
-//      case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("abc*"))
-//      case _ => assert(false)
-//    }
-//    assert(!iter.hasNext)
-//  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
deleted file mode 100644
index 7754765..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream
-
-import org.apache.eagle.datastream.core.StreamContext
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-/**
- * @since  12/4/15
- */
-case class Entity(name:String,value:Double,var inc:Int=0)
-
-object TestIterableWithGroupBy extends App {
-
-  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
-
-  val tuples = Seq(
-    Entity("a", 1),
-    Entity("a", 2),
-    Entity("a", 3),
-    Entity("b", 2),
-    Entity("c", 3),
-    Entity("d", 3)
-  )
-
-  env.from(tuples)
-    .groupByKey(_.name)
-    .map(o => {o.inc += 2;o})
-    .filter(_.name != "b")
-    .filter(_.name != "c")
-    .groupByKey(o=>(o.name,o.value))
-    .map(o => (o.name,o))
-    .map(o => (o._1,o._2.value,o._2.inc))
-    .foreach(println)
-
-  env.execute()
-}
-
-object TestIterableWithGroupByWithStreamContext extends App {
-  val stream = StreamContext(args)
-
-  val tuples = Seq(
-    Entity("a", 1),
-    Entity("a", 2),
-    Entity("a", 3),
-    Entity("b", 2),
-    Entity("c", 3),
-    Entity("d", 3)
-  )
-
-  stream.from(tuples)
-    .groupByKey(_.name)
-    .map(o => {o.inc += 2;o})
-    .filter(_.name != "b")
-    .filter(_.name != "c")
-    .groupByKey(o=>(o.name,o.value))
-    .map(o => (o.name,o))
-    .map(o => (o._1,o._2.value,o._2.inc))
-    .foreach(println)
-
-  stream.submit[StormExecutionEnvironment]
-}
-
-object TestIterableWithGroupByCircularly extends App{
-  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
-
-  val tuples = Seq(
-    Entity("a", 1),
-    Entity("a", 2),
-    Entity("a", 3),
-    Entity("b", 2),
-    Entity("c", 3),
-    Entity("d", 3)
-  )
-
-  env.from(tuples,recycle = true)
-    .map(o => {o.inc += 2;o})
-    .groupByKey(_.name)
-    .foreach(println)
-  env.execute()
-}
-
-object TestGroupByKeyOnSpoutproxy extends App{
-  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
-
-  val tuples = Seq(
-    Entity("a", 1),
-    Entity("a", 2),
-    Entity("a", 3),
-    Entity("b", 2),
-    Entity("c", 3),
-    Entity("d", 3)
-  )
-
-  env.fromSpout[String](TestSpout())
-    .groupByKey(_.charAt(0))
-    .foreach(println)
-  env.execute()
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/pom.xml b/eagle-core/eagle-data-process/eagle-stream-process-base/pom.xml
deleted file mode 100644
index 740de03..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/pom.xml
+++ /dev/null
@@ -1,130 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>eagle</groupId>
-        <artifactId>eagle-data-process-parent</artifactId>
-        <version>0.3.0</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>eagle-stream-process-base</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-math3</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_${scala.version}</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe</groupId>
-            <artifactId>config</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-annotations</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>eagle</groupId>
-            <artifactId>eagle-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_${scala.version}</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <!--<dependency>-->
-            <!--<groupId>org.apache.storm</groupId>-->
-            <!--<artifactId>storm-core</artifactId>-->
-            <!--<exclusions>-->
-                <!--<exclusion>-->
-                    <!--<groupId>ch.qos.logback</groupId>-->
-                    <!--<artifactId>logback-classic</artifactId>-->
-                <!--</exclusion>-->
-                <!--<exclusion>-->
-                    <!--<groupId>log4j</groupId>-->
-                    <!--<artifactId>log4j</artifactId>-->
-                <!--</exclusion>-->
-                <!--<exclusion>-->
-                    <!--<groupId>org.slf4j</groupId>-->
-                    <!--<artifactId>log4j-over-slf4j</artifactId>-->
-                <!--</exclusion>-->
-            <!--</exclusions>-->
-        <!--</dependency>-->
-    </dependencies>
-
-    <build>
-    <plugins>
-        <plugin>
-            <groupId>org.scala-tools</groupId>
-            <artifactId>maven-scala-plugin</artifactId>
-            <executions>
-                <execution>
-                    <id>scala-compile-first</id>
-                    <phase>process-resources</phase>
-                    <goals>
-                        <goal>add-source</goal>
-                        <goal>compile</goal>
-                    </goals>
-                </execution>
-                <execution>
-                    <id>scala-test-compile</id>
-                    <phase>process-test-resources</phase>
-                    <goals>
-                        <goal>testCompile</goal>
-                    </goals>
-                </execution>
-            </executions>
-        </plugin>
-
-        <plugin>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest-maven-plugin</artifactId>
-            <configuration>
-                <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-                <junitxml>.</junitxml>
-                <filereports>TestSuite.txt</filereports>
-            </configuration>
-            <executions>
-                <execution>
-                    <id>test</id>
-                    <goals>
-                        <goal>test</goal>
-                    </goals>
-                </execution>
-            </executions>
-        </plugin>
-    </plugins>
-    </build>
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
deleted file mode 100644
index 0c4b1ab..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.core;
-
-import java.io.Serializable;
-
-/**
- * expose simple interface for streaming executor to populate output data
- *
- */
-public interface EagleOutputCollector extends Serializable{
-	void collect(ValuesArray t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
deleted file mode 100644
index 5df7e55..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.core;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class JsonSerDeserUtils {
-	private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeserUtils.class);
-
-	public static <T> String serialize(T o) throws Exception{
-		return serialize(o, null);
-	}
-	
-	public static <T> String serialize(T o, List<Module> modules) throws Exception {
-		ObjectMapper mapper = new ObjectMapper();
-		if (modules != null) { 
-			mapper.registerModules(modules);
-		}
-		return mapper.writeValueAsString(o);
-	}
-
-	public static <T> T deserialize(String value, Class<T> cls) throws Exception{
-		return deserialize(value, cls, null);
-	}
-	
-	public static <T> T deserialize(String value, Class<T> cls, List<Module> modules) throws Exception{
-		ObjectMapper mapper = new ObjectMapper();
-		if (modules != null) { 
-			mapper.registerModules(modules);
-		}
-		return mapper.readValue(value, cls);	
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
deleted file mode 100644
index fc2a016..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.core;
-
-public class StreamingProcessConstants {
-	public static final String EVENT_PARTITION_KEY = "eventPartitionKey";
-	public static final String EVENT_STREAM_NAME = "streamName";
-	public static final String EVENT_ATTRIBUTE_MAP = "value";
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
deleted file mode 100644
index 9971cb2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.core;
-
-import java.util.ArrayList;
-
-/**
- * multiple datapoints are stored within one ValuesArray object
- * sent out
- */
-public class ValuesArray extends ArrayList<Object>{
-	private static final long serialVersionUID = -8218427810421668178L;
-
-	public ValuesArray() {
-        
-    }
-    
-    public ValuesArray(Object... vals) {
-        super(vals.length);
-        for(Object o: vals) {
-            add(o);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
deleted file mode 100644
index f40bb76..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.util;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.cli.*;
-
-import java.util.Map;
-
-/**
- * @since 8/22/15
- */
-public abstract class AbstractConfigOptionParser {
-
-    // private final Options options;
-    private final Parser parser;
-
-    public AbstractConfigOptionParser(){
-        parser = parser();
-        //options = options();
-    }
-
-    /**
-     * @return Parser
-     */
-    protected abstract Parser parser();
-
-    /**
-     * @return Options
-     */
-    protected abstract Options options();
-
-    public abstract Map<String,String> parseConfig(String[] arguments) throws ParseException;
-
-    /**
-     * Load config as system properties
-     *
-     * @param arguments command line arguments
-     * @throws ParseException
-     */
-    public Config load(String[] arguments) throws ParseException {
-        Map<String,String> configProps = parseConfig(arguments);
-        for(Map.Entry<String,String> entry:configProps.entrySet()){
-            System.setProperty(entry.getKey(),entry.getValue());
-        }
-        System.setProperty("config.trace", "loads");
-        return ConfigFactory.load();
-    }
-
-    public CommandLine parse(String[] arguments) throws ParseException {
-        return this.parser.parse(this.options(),arguments);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
deleted file mode 100644
index bbd4e38..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.util;
-
-import org.apache.commons.cli.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class ConfigOptionParser extends AbstractConfigOptionParser {
-    private final static String CONFIG_OPT_FLAG = "D";
-
-    @Override
-    protected Parser parser() {
-        return new BasicParser();
-    }
-
-    @Override
-    protected Options options() {
-        Options options = new Options();
-        options.addOption(CONFIG_OPT_FLAG, true, "Config properties in format of \"-D key=value\"");
-        return options;
-    }
-
-    @Override
-    public Map<String,String> parseConfig(String[] arguments) throws ParseException {
-        CommandLine cmd = parse(arguments);
-        return parseCommand(cmd);
-    }
-
-    protected Map<String,String> parseCommand(CommandLine cmd) throws ParseException {
-        Map<String,String> result = new HashMap<>();
-        if(cmd.hasOption(CONFIG_OPT_FLAG)){
-            String[] values = cmd.getOptionValues(CONFIG_OPT_FLAG);
-            for(String value:values){
-                int eqIndex = value.indexOf("=");
-                if(eqIndex>0 && eqIndex<value.length()){
-                    String k = value.substring(0,eqIndex);
-                    String v = value.substring(eqIndex+1,value.length());
-                    if(result.containsKey(k)){
-                        throw new ParseException("Duplicated "+CONFIG_OPT_FLAG+" "+value);
-                    }else{
-                        result.put(k,v);
-                    }
-                }else{
-                    throw new ParseException("Invalid format: -"+CONFIG_OPT_FLAG+" "+value+", required: -"+CONFIG_OPT_FLAG+" key=value");
-                }
-            }
-        }
-        return result;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
deleted file mode 100644
index 7e66478..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.List;
-
-public interface JavaMapper {
-    List<Object> map(List<Object> input);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
deleted file mode 100644
index 39ce6c2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.eagle.datastream;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * @since 12/8/15
- */
-public interface JavaTypeCompatible {
-    Class<?> getType();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
deleted file mode 100644
index 0b17775..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.partition;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface DataDistributionDao extends Serializable {
-
-    List<Weight> fetchDataDistribution(long startTime, long endTime) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
deleted file mode 100644
index 0614388..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.partition;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public interface PartitionAlgorithm extends Serializable {
-    Map<String, Integer> partition(List<Weight> weights, int k);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
deleted file mode 100644
index e431f28..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.partition;
-
-import java.io.Serializable;
-
-public interface PartitionStrategy extends Serializable {
-
-    int balance(String key, int buckNum);
-}



Mime
View raw message