eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [5/5] incubator-eagle git commit: EAGLE-66 Typesafe Streaming DSL and KeyValue based Grouping
Date Wed, 16 Dec 2015 06:01:47 GMT
EAGLE-66 Typesafe Streaming DSL and KeyValue based Grouping

- Decouple StreamProducer = StreamInfo + StreamProtocol
- Support typesafe DSL for StreamProducer
- Support KeyedStream and groupByKey
- Decouple ExecutionEnvironment

https://issues.apache.org/jira/browse/EAGLE-66

Author: @haoch <hao@apache.org>
Reviewer: @RalphSu <suliangfei@gmail.com>

Closes #26 #17


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/52b8e58b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/52b8e58b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/52b8e58b

Branch: refs/heads/master
Commit: 52b8e58b1af53273782454f52e61b4f4700626c9
Parents: 2734b42
Author: Hao Chen <hao@apache.org>
Authored: Wed Dec 16 13:58:10 2015 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Wed Dec 16 13:58:10 2015 +0800

----------------------------------------------------------------------
 .../alert/config/EmailNotificationConfig.java   |   2 +-
 .../apache/eagle/executor/AlertExecutor.java    |   2 +-
 .../impl/storm/AbstractStormSpoutProvider.java  |  29 --
 .../dataproc/impl/storm/StormSpoutProvider.java |  29 ++
 .../hdfs/HDFSSourcedStormSpoutProvider.java     |   5 +-
 .../storm/kafka/KafkaSourcedSpoutProvider.java  |   7 +-
 .../storm/kafka/KafkaSourcedSpoutScheme.java    |  16 +-
 .../partition/CustomPartitionGrouping.java      |  51 ++++
 .../storm/partition/EagleCustomGrouping.java    |  51 ----
 .../org/apache/eagle/datastream/JavaMapper.java |  26 --
 .../datastream/JavaMapperStormExecutor.java     |   3 +-
 .../JavaStormExecutorForAlertWrapper.java       |   1 +
 .../eagle/datastream/utils/JavaReflections.java |  31 ++
 .../AbstractStreamProducerGraph.scala           |  29 --
 .../datastream/AbstractTopologyCompiler.scala   |  21 --
 .../datastream/AbstractTopologyExecutor.scala   |  21 --
 .../datastream/AlertExecutorConsumerUtils.scala |  75 -----
 .../eagle/datastream/ExecutionEnvironment.scala |  73 -----
 .../datastream/ExecutionEnvironments.scala      | 134 +++++++++
 .../eagle/datastream/FilterBoltWrapper.scala    |  48 ---
 .../apache/eagle/datastream/GraphPrinter.scala  |  37 ---
 .../eagle/datastream/JavaStormBoltWrapper.scala |  52 ----
 .../eagle/datastream/MapBoltWrapper.scala       |  68 -----
 .../eagle/datastream/NodeNameSelector.scala     |  26 --
 .../eagle/datastream/OutputFieldNameConst.scala |  21 --
 .../apache/eagle/datastream/SpoutProxy.scala    |  59 ----
 .../eagle/datastream/StormBoltFactory.scala     |  47 ---
 .../eagle/datastream/StormBoltWrapper.scala     |  60 ----
 .../StormExecutorForAlertWrapper.scala          |  43 ---
 .../eagle/datastream/StormSpoutFactory.scala    |  46 ---
 .../eagle/datastream/StormStreamDAG.scala       |  68 -----
 .../datastream/StormStreamDAGTransformer.scala  |  46 ---
 .../datastream/StormTopologyCompiler.scala      | 108 -------
 .../datastream/StormTopologyExecutorImpl.scala  |  74 -----
 .../eagle/datastream/StreamAlertExpansion.scala | 198 -------------
 .../apache/eagle/datastream/StreamAppDSL.scala  | 120 --------
 .../eagle/datastream/StreamConnector.scala      |  36 ---
 .../eagle/datastream/StreamDAGExpansion.scala   |  27 --
 .../datastream/StreamGroupbyExpansion.scala     |  65 ----
 .../eagle/datastream/StreamNameExpansion.scala  |  41 ---
 .../StreamParallelismConfigExpansion.scala      |  55 ----
 .../eagle/datastream/StreamProducer.scala       | 221 --------------
 .../eagle/datastream/StreamUnionExpansion.scala |  62 ----
 .../apache/eagle/datastream/UnionUtils.scala    |  41 ---
 .../core/AbstractTopologyCompiler.scala         |  21 ++
 .../core/AbstractTopologyExecutor.scala         |  21 ++
 .../eagle/datastream/core/Configuration.scala   |  74 +++++
 .../datastream/core/ExecutionEnvironment.scala  |  80 +++++
 .../datastream/core/StreamAlertExpansion.scala  | 203 +++++++++++++
 .../eagle/datastream/core/StreamConnector.scala |  97 ++++++
 .../eagle/datastream/core/StreamDAG.scala       |  68 +++++
 .../datastream/core/StreamDAGExpansion.scala    |  27 ++
 .../datastream/core/StreamDAGTransformer.scala  |  45 +++
 .../core/StreamGroupbyExpansion.scala           |  77 +++++
 .../datastream/core/StreamNameExpansion.scala   |  49 ++++
 .../core/StreamParallelismConfigExpansion.scala |  61 ++++
 .../eagle/datastream/core/StreamProducer.scala  | 294 +++++++++++++++++++
 .../datastream/core/StreamProducerGraph.scala   |  29 ++
 .../eagle/datastream/core/StreamProtocol.scala  | 176 +++++++++++
 .../datastream/core/StreamSourceBuilder.scala   |  47 +++
 .../datastream/core/StreamUnionExpansion.scala  |  71 +++++
 .../kafka/JsonMessageDeserializer.scala         |  45 ---
 .../datastream/kafka/KafkaStreamMonitor.scala   |  35 ---
 .../datastream/storm/AbstractStreamBolt.scala   | 119 ++++++++
 .../datastream/storm/FilterBoltWrapper.scala    |  41 +++
 .../datastream/storm/ForeachBoltWrapper.scala   |  44 +++
 .../datastream/storm/IterableStreamSpout.scala  |  71 +++++
 .../datastream/storm/JavaStormBoltWrapper.scala |  53 ++++
 .../storm/JsonMessageDeserializer.scala         |  45 +++
 .../datastream/storm/KafkaStreamMonitor.scala   |  31 ++
 .../eagle/datastream/storm/MapBoltWrapper.scala |  66 +++++
 .../eagle/datastream/storm/SpoutProxy.scala     | 111 +++++++
 .../datastream/storm/StormBoltFactory.scala     |  53 ++++
 .../datastream/storm/StormBoltWrapper.scala     |  61 ++++
 .../storm/StormExecutionEnvironment.scala       |  42 +++
 .../storm/StormExecutorForAlertWrapper.scala    |  41 +++
 .../datastream/storm/StormSpoutFactory.scala    |  67 +++++
 .../storm/StormTopologyCompiler.scala           | 118 ++++++++
 .../storm/StormTopologyExecutorImpl.scala       |  71 +++++
 .../utils/AlertExecutorConsumerUtils.scala      |  76 +++++
 .../eagle/datastream/utils/GraphPrinter.scala   |  59 ++++
 .../eagle/datastream/utils/NameConstants.scala  |  24 ++
 .../datastream/utils/NodeNameSelector.scala     |  28 ++
 .../eagle/datastream/utils/ReflectionS.scala    |  55 ++++
 .../eagle/datastream/utils/UnionUtils.scala     |  43 +++
 .../TestExecutionEnvironmentJava.java           |  41 +++
 .../apache/eagle/datastream/TestJavaMain.java   |  30 +-
 .../datastream/TestJavaReflectionUtils.java     |  36 +++
 .../datastream/TestKafkaStreamMonitor.java      |   3 +-
 .../eagle/datastream/TestDAGExpansion.scala     |  40 ++-
 .../datastream/TestExecutionEnvironment.scala   |  34 +++
 .../eagle/datastream/TestStormRunner.scala      |  62 ++--
 .../eagle/datastream/TestTypeSafedDSL.scala     |  87 ++++++
 .../org/apache/eagle/datastream/JavaMapper.java |  26 ++
 .../eagle/datastream/JavaTypeCompatible.java    |  24 ++
 .../apache/eagle/datastream/EagleTuple.scala    |  20 +-
 .../apache/eagle/datastream/FlatMapper.scala    |   4 +-
 .../eagle/datastream/StormStreamExecutor.scala  |   9 +-
 .../service/client/EagleServiceConnector.java   |   4 +-
 .../metric/kafka/EagleMetricCollectorMain.java  |  23 +-
 .../kafka/KafkaOffsetSourceSpoutProvider.java   |   7 +-
 .../hdfs/entity/FileSensitivityAPIEntity.java   |   2 +-
 .../partition/DataDistributionDaoImpl.java      |   1 -
 .../hbase/HbaseAuditLogProcessorMain.java       |  25 +-
 ...baseResourceSensitivityDataJoinExecutor.java |   1 -
 .../run_auditlog_topology.sh                    |  15 +
 .../run_hostname_lookkup.sh                     |  15 +
 .../run_message_producer.sh                     |  15 +
 .../run_message_producer_in_assembly.sh         |  15 +
 .../assembly/eagle-dam-auditlog-assembly.xml    |  17 ++
 .../auditlog/HdfsAuditLogProcessorMain.java     |  44 ++-
 .../auditlog/HdfsUserCommandReassembler.java    |  15 +-
 .../src/main/resources/application.conf         |   2 +-
 .../src/main/resources/auditlog/auditlog.1      |  16 -
 .../main/resources/security-auditlog-storm.yaml |  15 +
 .../HDFSSecurityLogProcessorMain.java           |  27 +-
 .../service/security/hdfs/HDFSFileSystem.java   |  17 +-
 .../HiveJobRunningMonitoringMain.java           |  23 +-
 .../UserProfileDetectionBatchMain.java          |  18 +-
 .../UserProfileDetectionStreamMain.java         |  24 +-
 test.txt                                        |   1 -
 121 files changed, 3522 insertions(+), 2350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
index dd18af7..4816bc9 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
@@ -46,4 +46,4 @@ public class EmailNotificationConfig extends NotificationConfig{
 	public void setTplFileName(String tplFileName) {
 		this.tplFileName = tplFileName;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
index d86a846..7e4372c 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
@@ -409,4 +409,4 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 			}
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
deleted file mode 100644
index 1332887..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm;
-
-import backtype.storm.topology.base.BaseRichSpout;
-
-import com.typesafe.config.Config;
-
-/**
- * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation
- * which can be retrieved from getSpout method.
- */
-public abstract class AbstractStormSpoutProvider{
-	public abstract BaseRichSpout getSpout(Config context);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
new file mode 100644
index 0000000..ab90ad7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm;
+
+import backtype.storm.topology.base.BaseRichSpout;
+
+import com.typesafe.config.Config;
+
+/**
+ * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation
+ * which can be retrieved from getSpout method.
+ */
+public interface StormSpoutProvider {
+	public BaseRichSpout getSpout(Config context);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
index 75305f1..2323f39 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
@@ -14,16 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.eagle.dataproc.impl.storm.hdfs;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.topology.base.BaseRichSpout;
 
-public class HDFSSourcedStormSpoutProvider extends AbstractStormSpoutProvider {
+public class HDFSSourcedStormSpoutProvider implements StormSpoutProvider {
 	private static final Logger LOG = LoggerFactory.getLogger(HDFSSourcedStormSpoutProvider.class);
 	
 	public abstract static class HDFSSpout extends BaseRichSpout{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
index 373b3ca..53537d7 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
@@ -28,9 +28,9 @@ import storm.kafka.ZkHosts;
 import backtype.storm.spout.SchemeAsMultiScheme;
 import backtype.storm.topology.base.BaseRichSpout;
 
-import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
 
-public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{
+public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
     private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
 
 	public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
@@ -89,7 +89,6 @@ public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{
 		}
 		
 		spoutConfig.scheme = getStreamScheme(deserClsName, context);
-		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
-		return kafkaSpout;
+        return new KafkaSpout(spoutConfig);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
index 8b65c1f..faeb7c3 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
@@ -19,12 +19,12 @@ package org.apache.eagle.dataproc.impl.storm.kafka;
 import backtype.storm.spout.Scheme;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
+import org.apache.eagle.datastream.utils.NameConstants;
 
 import java.lang.reflect.Constructor;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
-import java.util.Map;
 
 /**
  * This scheme defines how a kafka message is deserialized and the output field name for storm stream
@@ -56,10 +56,16 @@ public class KafkaSourcedSpoutScheme implements Scheme {
 		// the following tasks are executed within the same process of kafka spout
 		return Arrays.asList(tmp);
 	}
-	
+
+    /**
+     * Default only f0, but it requires to be overrode if different
+     *
+     * TODO: Handle the schema with KeyValue based structure
+     *
+     * @return Fields
+     */
 	@Override
 	public Fields getOutputFields() {
-//		return new Fields(deserializer.getOutputFields());
-		throw new UnsupportedOperationException("output fields should be declared in sub class of KafkaSourcedSpoutProvider");
+        return new Fields(NameConstants.FIELD_PREFIX()+"0");
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
new file mode 100644
index 0000000..9c1ab68
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
@@ -0,0 +1,51 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import org.apache.eagle.partition.PartitionStrategy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomPartitionGrouping implements CustomStreamGrouping {
+
+    public List<Integer> targetTasks;
+    public PartitionStrategy strategy;
+
+    public CustomPartitionGrouping(PartitionStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        this.targetTasks = new ArrayList<>(targetTasks);
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        int numTasks = targetTasks.size();
+        int targetTaskIndex = strategy.balance((String)values.get(0), numTasks);
+        return Arrays.asList(targetTasks.get(targetTaskIndex));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
deleted file mode 100644
index 96e42b7..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.dataproc.impl.storm.partition;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import org.apache.eagle.partition.PartitionStrategy;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class EagleCustomGrouping implements CustomStreamGrouping {
-
-    public List<Integer> targetTasks;
-    public PartitionStrategy strategy;
-
-    public EagleCustomGrouping(PartitionStrategy strategy) {
-        this.strategy = strategy;
-    }
-
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-        this.targetTasks = new ArrayList<>(targetTasks);
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        int numTasks = targetTasks.size();
-        int targetTaskIndex = strategy.balance((String)values.get(0), numTasks);
-        return Arrays.asList(targetTasks.get(targetTaskIndex));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapper.java
deleted file mode 100644
index 7e66478..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapper.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.List;
-
-public interface JavaMapper {
-    List<Object> map(List<Object> input);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
index 04a80e5..993a4a2 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
@@ -29,6 +29,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import org.apache.eagle.datastream.utils.NameConstants;
 
 public class JavaMapperStormExecutor extends BaseRichBolt{
     public static class e1 extends JavaMapperStormExecutor {
@@ -75,7 +76,7 @@ public class JavaMapperStormExecutor extends BaseRichBolt{
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         List<String> fields = new ArrayList<String>();
         for(int i=0; i<numOutputFields; i++){
-            fields.add(OutputFieldNameConst.FIELD_PREFIX() + i);
+            fields.add(NameConstants.FIELD_PREFIX() + i);
         }
         declarer.declare(new Fields(fields));
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
index 3aacb32..331e51a 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
@@ -52,4 +52,5 @@ public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<S
         };
         delegate.flatMap(input, delegateCollector);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
new file mode 100644
index 0000000..04b4bed
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.utils;
+
+import java.lang.reflect.ParameterizedType;
+
+/**
+ * @since 12/7/15
+ */
+class JavaReflections {
+    @SuppressWarnings("unchecked")
+    public static Class<?> getGenericTypeClass(final Object obj,int index) {
+        return (Class<?>) ((ParameterizedType) obj
+                .getClass()
+                .getGenericSuperclass()).getActualTypeArguments()[index];
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala
deleted file mode 100644
index dc2c198..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream
-
-trait AbstractStreamProducerGraph {
-  def addEdge(from: StreamProducer, to: StreamProducer, streamConnector: StreamConnector)
-  def addVertex(producer: StreamProducer)
-  def iterator() : Iterator[StreamProducer]
-  def isSource(v : StreamProducer) : Boolean
-  def outgoingEdgesOf(v : StreamProducer) : scala.collection.mutable.Set[StreamConnector]
-  def getNodeByName(name : String) : Option[StreamProducer]
-  def incomingVertexOf(v: StreamProducer) : scala.collection.mutable.Set[StreamProducer]
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala
deleted file mode 100644
index 8c53ed5..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-trait AbstractTopologyCompiler{
-  def buildTopology : AbstractTopologyExecutor
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
deleted file mode 100644
index 9ac3cc9..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-trait AbstractTopologyExecutor {
-  def execute
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
deleted file mode 100644
index cb6ac4f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import java.util
-
-import org.apache.eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
-import org.apache.eagle.alert.notification.AlertNotificationExecutor
-import org.apache.eagle.alert.persist.AlertPersistExecutor
-import org.apache.eagle.executor.AlertExecutor
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.mutable.ListBuffer
-
-/**
- * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
- *
- * <br/><br/>
- * Explanations for programId, alertExecutorId and policy<br/><br/>
- * - programId - distributed or single-process program for example one storm topology<br/>
- * - alertExecutorId - one process/thread which executes multiple policies<br/>
- * - policy - some rules to be evaluated<br/>
- *
- * <br/>
- *
- * Normally the mapping is like following:
- * <pre>
- * programId (1:N) alertExecutorId
- * alertExecutorId (1:N) policy
- * </pre>
- */
-
-object AlertExecutorConsumerUtils {
-  private val LOG: Logger = LoggerFactory.getLogger(AlertExecutorConsumerUtils.getClass)
-
-  def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector], alertStreamProducers: List[StreamProducer]): Unit = {
-    var alertExecutorIdList : java.util.List[String] = new util.ArrayList[String]()
-    alertStreamProducers.map(x =>
-      alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertExecutorId));
-    val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertDefinitionDao
-    val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
-    val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
-
-    val entityDedupStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),entityDedupExecutor)
-    val persistStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),persistExecutor)
-    val emailDedupStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),emailDedupExecutor)
-    val notificationStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),notificationExecutor)
-    toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
-    toBeAddedEdges += StreamConnector(emailDedupStreamProducer, notificationStreamProducer)
-
-    alertStreamProducers.foreach(sp => {
-      toBeAddedEdges += StreamConnector(sp, entityDedupStreamProducer)
-      toBeAddedEdges += StreamConnector(sp, emailDedupStreamProducer)
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
deleted file mode 100644
index 488d52a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-object ExecutionEnvironmentFactory{
-
-  def getStorm(config : Config) = new StormExecutionEnvironment(config)
-  def getStorm:StormExecutionEnvironment = {
-    val config = ConfigFactory.load()
-    getStorm(config)
-  }
-}
-
-abstract class ExecutionEnvironment(config : Config){
-  def execute()
-}
-
-class StormExecutionEnvironment(config: Config) extends ExecutionEnvironment(config){
-  val LOG = LoggerFactory.getLogger(classOf[StormExecutionEnvironment])
-  val dag = new DirectedAcyclicGraph[StreamProducer, StreamConnector](classOf[StreamConnector])
-
-  override def execute() : Unit = {
-    LOG.info("initial graph:\n")
-    GraphPrinter.print(dag)
-    new StreamAlertExpansion(config).expand(dag)
-    LOG.info("after StreamAlertExpansion graph:")
-    GraphPrinter.print(dag)
-    new StreamUnionExpansion(config).expand(dag)
-    LOG.info("after StreamUnionExpansion graph:")
-    GraphPrinter.print(dag)
-    new StreamGroupbyExpansion(config).expand(dag)
-    LOG.info("after StreamGroupbyExpansion graph:")
-    GraphPrinter.print(dag)
-    new StreamNameExpansion(config).expand(dag)
-    LOG.info("after StreamNameExpansion graph:")
-    GraphPrinter.print(dag)
-    new StreamParallelismConfigExpansion(config).expand(dag)
-    LOG.info("after StreamParallelismConfigExpansion graph:")
-    GraphPrinter.print(dag)
-    val stormDag = StormStreamDAGTransformer.transform(dag)
-    StormTopologyCompiler(config, stormDag).buildTopology.execute
-  }
-
-  def newSource(source: BaseRichSpout): StormSourceProducer ={
-    val ret = StormSourceProducer(UniqueId.incrementAndGetId(), source)
-    ret.config = config
-    ret.graph = dag
-    dag.addVertex(ret)
-    ret
-  }
-
-  def newSource(sourceProvider: AbstractStormSpoutProvider):StormSourceProducer = newSource(sourceProvider.getSpout(config))
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
new file mode 100644
index 0000000..7e5f271
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+
+import scala.reflect.runtime.universe._
+
+/**
+ * Execution environment factory
+ *
+ * The factory is mainly used for create or manage execution environment,
+ * and also handles the shared works like configuration, arguments for execution environment
+ *
+ * Notice: this factory class should not know any implementation like storm or spark
+ *
+ * @since 0.3.0
+ */
+object ExecutionEnvironments{
+  /**
+   * Use `'''get[StormExecutionEnvironment](config)'''` instead
+   *
+   * @param config
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm(config : Config) = new StormExecutionEnvironment(config)
+
+  /**
+   * Use `'''get[StormExecutionEnvironment]'''` instead
+   *
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm:StormExecutionEnvironment = {
+    val config = ConfigFactory.load()
+    getStorm(config)
+  }
+
+  /**
+   * Use `'''get[StormExecutionEnvironment](args)'''` instead
+   *
+   * @see get[StormExecutionEnvironment](args)
+   *
+   * @param args
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm(args:Array[String]):StormExecutionEnvironment = {
+    getStorm(new ConfigOptionParser().load(args))
+  }
+
+  /**
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
+    get[T](ConfigFactory.load())
+  }
+
+  /**
+   *
+   * @param config
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={
+    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
+  }
+
+  /**
+   *
+   * @param args
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={
+    get[T](new ConfigOptionParser().load(args))
+  }
+
+  /**
+   * Support java style for default config
+   *
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
+    get[T](ConfigFactory.load(),clazz)
+  }
+
+  /**
+   * Support java style
+   * @param config command config
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
+    clazz.getConstructor(classOf[Config]).newInstance(config)
+  }
+
+  /**
+   * Support java style
+   *
+   * @param args command arguments in string array
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
+    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
deleted file mode 100644
index d31759b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.slf4j.LoggerFactory
-
-case class FilterBoltWrapper[T](fn : T => Boolean) extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-  }
-
-  override def execute(input : Tuple): Unit = {
-    input.getValue(0) match {
-      case v:T =>
-        if(fn(v)){
-          _collector.emit(input, input.getValues)
-          _collector.ack(input)
-        }
-    }
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    declarer.declare(new Fields(OutputFieldNameConst.FIELD_PREFIX + "0"))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
deleted file mode 100644
index 1925a89..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-
-object GraphPrinter {
-  private val LOG = LoggerFactory.getLogger(GraphPrinter.getClass)
-  def print(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit ={
-    val iter = dag.iterator()
-    while(iter.hasNext) {
-      val current = iter.next()
-      dag.outgoingEdgesOf(current).foreach(edge => {
-        LOG.info(edge.from + "{" + edge.from.parallelism + "}" +" => " + edge.to + "{" + edge.to.parallelism + "}" + " with groupByFields " + edge.groupByFields)
-      })
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
deleted file mode 100644
index 8ebfd7b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[EagleTuple]) extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-    worker.init
-  }
-
-  override def execute(input : Tuple): Unit ={
-    worker.flatMap(input.getValues, new Collector[EagleTuple](){
-      def collect(t: EagleTuple): Unit ={
-        _collector.emit(input, t.getList.asJava)
-      }
-    })
-    _collector.ack(input)
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    val fields = worker.fields
-    LOG.info("output fields for worker " + worker + " : " + fields.toList)
-    declarer.declare(new Fields(fields:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
deleted file mode 100644
index 99fa32a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.slf4j.LoggerFactory
-
-/**
- * @since  9/29/15
- */
-case class MapBoltWrapper[T,R](num: Int, fn: T => R) extends BaseRichBolt {
-  val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    var fields = new util.ArrayList[String]()
-    var i : Int = 0;
-    while(i < num){
-      fields.add(OutputFieldNameConst.FIELD_PREFIX + i)
-      i += 1
-    }
-    declarer.declare(new Fields(fields))
-  }
-
-  override def execute(input: Tuple): Unit = {
-    val size = input.size()
-    var values : AnyRef = null
-    size match {
-      case 1 => values = scala.Tuple1(input.getValue(0))
-      case 2 => values = scala.Tuple2(input.getValue(0), input.getValue(1))
-      case 3 => values = scala.Tuple3(input.getValue(0), input.getValue(1), input.getValue(2))
-      case 4 => values = scala.Tuple4(input.getValue(0), input.getValue(1), input.getValue(2), input.getValue(3))
-      case _ => throw new IllegalArgumentException
-    }
-    val output = fn(values.asInstanceOf[T])
-    output match {
-      case scala.Tuple1(a) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef]))
-      case scala.Tuple2(a, b) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef]))
-      case scala.Tuple3(a, b, c) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef]))
-      case scala.Tuple4(a, b, c, d) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef]))
-      case a => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef]))
-    }
-    _collector.ack(input)
-  }
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
deleted file mode 100644
index 8b06322..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-case class NodeNameSelector(producer : StreamProducer) {
-  def getName : String = {
-    producer.name match {
-      case null => producer.toString
-      case _ => producer.name
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
deleted file mode 100644
index 64659b7..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-object OutputFieldNameConst {
-  val FIELD_PREFIX = "f"
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
deleted file mode 100644
index aca3b5b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.task.TopologyContext
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichSpout
-import backtype.storm.tuple.Fields
-
-/**
- * Declare delegated BaseRichSpout with given field names
- *
- * @param delegate delegated BaseRichSpout
- * @param outputFields given field names
- */
-case class SpoutProxy(delegate: BaseRichSpout, outputFields: java.util.List[String]) extends BaseRichSpout{
-  def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
-    this.delegate.open(conf, context, collector)
-  }
-
-  def nextTuple {
-    this.delegate.nextTuple
-  }
-
-  override def ack(msgId: AnyRef) {
-    this.delegate.ack(msgId)
-  }
-
-  override def fail(msgId: AnyRef) {
-    this.delegate.fail(msgId)
-  }
-
-  override def deactivate {
-    this.delegate.deactivate
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
-    declarer.declare(new Fields(outputFields))
-  }
-
-  override def close {
-    this.delegate.close
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
deleted file mode 100644
index 2a2e268..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream
-
-import backtype.storm.topology.base.BaseRichBolt
-import com.typesafe.config.Config
-
-object StormBoltFactory {
-  def getBoltWrapper(graph: AbstractStreamProducerGraph, producer : StreamProducer, config : Config) : BaseRichBolt = {
-    producer match{
-      case FlatMapProducer(id, worker) => {
-        if(worker.isInstanceOf[JavaStormStreamExecutor[EagleTuple]]){
-          worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]].prepareConfig(config)
-          JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
-        }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){
-          worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config)
-          StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]])
-        }else {
-          throw new UnsupportedOperationException
-        }
-      }
-      case FilterProducer(id, fn) => {
-        FilterBoltWrapper(fn)
-      }
-      case MapProducer(id, n, fn) => {
-        MapBoltWrapper(n, fn)
-      }
-      case _ => throw new UnsupportedOperationException
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
deleted file mode 100644
index 7f27483..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import backtype.storm.task.{OutputCollector, TopologyContext}
-import backtype.storm.topology.OutputFieldsDeclarer
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.tuple.{Fields, Tuple}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-case class StormBoltWrapper(worker : StormStreamExecutor[EagleTuple]) extends BaseRichBolt{
-  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
-  var _collector : OutputCollector = null
-
-  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
-    _collector = collector
-    worker.init
-  }
-
-  override def execute(input : Tuple): Unit = {
-    try {
-      worker.flatMap(input.getValues.asScala, new Collector[EagleTuple] {
-        override def collect(t: EagleTuple): Unit = {
-          _collector.emit(input, t.getList.asJava)
-        }
-      })
-    }catch{
-      case ex: Exception => {
-        LOG.error("fail executing", ex)
-        _collector.fail(input)
-        throw new RuntimeException(ex)
-      }
-    }
-    _collector.ack(input)
-  }
-
-  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
-    val fields = worker.fields
-    LOG.info("output fields for worker " + worker + " : " + fields.toList)
-    declarer.declare(new Fields(fields:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
deleted file mode 100644
index 6ff1d52..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import java.util
-
-import com.typesafe.config.Config
-
-case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String)
-  extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{
-  override def prepareConfig(config: Config): Unit = {
-    delegate.prepareConfig(config)
-  }
-
-  override def init: Unit = {
-    delegate.init
-  }
-
-  override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, String, util.SortedMap[Object, Object]]]): Unit = {
-    delegate.flatMap(input, new Collector[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]] {
-      override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): Unit = {
-        collector.collect(Tuple3(r.f0, streamName, r.f1))
-      }
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
deleted file mode 100644
index dcb51fd..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.Config
-
-object StormSpoutFactory {
-  /**
-   * @param config context configuration
-   * @param sourceProducer source producer
-   * @return
-   */
-  def createSpout(config: Config, sourceProducer: StormSourceProducer) : BaseRichSpout = {
-    val numFields = sourceProducer.numFields
-    if(numFields <= 0) {
-      sourceProducer.source
-    }else{
-      var i = 0
-      val ret = new util.ArrayList[String]
-      while(i < numFields){
-        ret.add(OutputFieldNameConst.FIELD_PREFIX + i)
-        i += 1
-      }
-      SpoutProxy(sourceProducer.source, ret)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
deleted file mode 100644
index f4129ae..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-
-import scala.collection.JavaConverters._
-import scala.collection.{JavaConversions, mutable}
-
-/**
- * wrapper of DAG, used for storm topology compiler
- */
-class StormStreamDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]) extends AbstractStreamProducerGraph {
-  var nodeMap: mutable.Map[String, StreamProducer] = null
-
-  override def addEdge(from: StreamProducer, to: StreamProducer, streamConnector: StreamConnector): Unit = {
-    graph.addEdge(from, to, streamConnector)
-  }
-
-  override def addVertex(producer: StreamProducer): Unit = {
-    graph.addVertex(producer)
-  }
-
-  override def iterator(): Iterator[StreamProducer] = {
-    JavaConversions.asScalaIterator(graph.iterator())
-  }
-
-  override def isSource(v: StreamProducer): Boolean = {
-    graph.inDegreeOf(v) match {
-      case 0 => true
-      case _ => false
-    }
-  }
-
-  override def outgoingEdgesOf(v: StreamProducer): scala.collection.mutable.Set[StreamConnector] = {
-    JavaConversions.asScalaSet(graph.outgoingEdgesOf(v))
-  }
-
-  override def getNodeByName(name: String): Option[StreamProducer] = {
-    nodeMap.get(name)
-  }
-
-  def setNodeMap(nodeMap: mutable.Map[String, StreamProducer]): Unit = {
-    this.nodeMap = nodeMap
-  }
-
-  override def incomingVertexOf(v: StreamProducer): scala.collection.mutable.Set[StreamProducer] = {
-    val set = mutable.Set[StreamProducer]()
-    graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e))
-    set
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
deleted file mode 100644
index 254d84b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream
-
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import scala.collection.mutable
-
-/**
- * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler
- */
-object StormStreamDAGTransformer {
-  /**
-   * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG
-   *
-   * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
-   * @return StormStreamDAG
-   */
-  def transform(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) : StormStreamDAG = {
-    val stormDAG = new StormStreamDAG(dag)
-    val nodeMap = mutable.HashMap[String, StreamProducer]()
-    val iter = dag.iterator()
-    while(iter.hasNext){
-      val sp = iter.next()
-      nodeMap.put(sp.name, sp)
-    }
-    stormDAG.setNodeMap(nodeMap)
-    stormDAG
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
deleted file mode 100644
index 4f9fccc..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.datastream
-
-import java.util
-
-import backtype.storm.topology.base.BaseRichBolt
-import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
-import backtype.storm.tuple.Fields
-import com.typesafe.config.Config
-import org.apache.eagle.dataproc.impl.storm.partition.EagleCustomGrouping
-import org.slf4j.LoggerFactory
-
-case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGraph) extends AbstractTopologyCompiler{
-  val LOG = LoggerFactory.getLogger(StormTopologyCompiler.getClass)
-  val boltCache = scala.collection.mutable.Map[StreamProducer, StormBoltWrapper]()
-
-  override def buildTopology: AbstractTopologyExecutor ={
-    val builder = new TopologyBuilder();
-    val iter = graph.iterator()
-    val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]()
-    while(iter.hasNext){
-      val from = iter.next()
-      val fromName = from.name
-      if(graph.isSource(from)){
-        val spout = StormSpoutFactory.createSpout(config, from.asInstanceOf[StormSourceProducer])
-        builder.setSpout(fromName, spout, from.parallelism)
-        LOG.info("Spout name : " + fromName + " with parallelism " + from.parallelism)
-      } else {
-        LOG.info("Bolt name:" + fromName)
-      }
-
-      val edges = graph.outgoingEdgesOf(from)
-      edges.foreach(sc => {
-        val toName = sc.to.name
-        var boltDeclarer : BoltDeclarer = null
-        val toBolt = createBoltIfAbsent(toName)
-        boltDeclarerCache.get(toName) match{
-          case None => {
-            var finalParallelism = 1
-            graph.getNodeByName(toName) match {
-              case Some(p) => finalParallelism = p.parallelism
-              case None => finalParallelism = 1
-            }
-            boltDeclarer = builder.setBolt(toName, toBolt, finalParallelism);
-            LOG.info("created bolt " + toName + " with parallelism " + finalParallelism)
-            boltDeclarerCache.put(toName, boltDeclarer)
-          }
-          case Some(bt) => boltDeclarer = bt
-        }
-        if (sc.groupByFields != Nil) {
-          boltDeclarer.fieldsGrouping(fromName, new Fields(fields(sc.groupByFields)))
-        }
-        else if (sc.customGroupBy != null) {
-          boltDeclarer.customGrouping(fromName, new EagleCustomGrouping(sc.customGroupBy));
-        }
-        else {
-          boltDeclarer.shuffleGrouping(fromName);
-        }
-/*        sc.groupByFields match{
-          case Nil => boltDeclarer.shuffleGrouping(fromName)
-          case p => boltDeclarer.fieldsGrouping(fromName, new Fields(fields(p)))
-        }*/
-        LOG.info("bolt connected " + fromName + "->" + toName + " with groupby fields " + sc.groupByFields)
-      })
-    }
-    new StormTopologyExecutorImpl(builder.createTopology, config)
-  }
-
-  def fields(fields : Seq[Int]): java.util.List[String] ={
-    val ret = new util.ArrayList[String]
-    fields.map(n => ret.add(OutputFieldNameConst.FIELD_PREFIX + n))
-    ret
-  }
-
-  def createBoltIfAbsent(name : String) : BaseRichBolt = {
-    val producer = graph.getNodeByName(name)
-    producer match{
-      case Some(p) => createBoltIfAbsent(graph, p)
-      case None => throw new IllegalArgumentException("please check bolt name " + name)
-    }
-  }
-
-  def createBoltIfAbsent(graph: AbstractStreamProducerGraph, producer : StreamProducer): BaseRichBolt ={
-    boltCache.get(producer) match{
-      case Some(bolt) => bolt
-      case None => {
-        StormBoltFactory.getBoltWrapper(graph, producer, config)
-      }
-    }
-  }
-}



Mime
View raw message