eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [3/4] incubator-eagle git commit: EAGLE-130 Eagle Pipeline DSL: Parser, Compiler, Runner
Date Wed, 20 Jan 2016 06:48:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
new file mode 100644
index 0000000..e63280a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.stream.pipeline.parser.{ConnectionIdentifier, DataFlow, DefinitionIdentifier, Identifier}
+import org.scalatest.{FlatSpec, Matchers}
+
+class DataFlowSpec extends FlatSpec with Matchers {
+  val dataFlowConfig =
+    """
+       |{
+       |	kafkaSource.metric_event_1 {
+       |    schema {
+       |      metric: string
+       |      timestamp: long
+       |      value: double
+       |    }
+       |		parallism = 1000
+       |		topic = "metric_event_1"
+       |		zkConnection = "localhost:2181"
+       |		zkConnectionTimeoutMS = 15000
+       |		consumerGroupId = "Consumer"
+       |		fetchSize = 1048586
+       |		transactionZKServers = "localhost"
+       |		transactionZKPort = 2181
+       |		transactionZKRoot = "/consumers"
+       |		transactionStateUpdateMS = 2000
+       |		deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+       |	}
+       |
+       |	kafkaSource.metric_event_2 {
+       |		schema = {
+       |      metric: string
+       |      timestamp: long
+       |      value: double
+       |    }
+       |		parallism = 1000
+       |		topic = "metric_event_2"
+       |		zkConnection = "localhost:2181"
+       |		zkConnectionTimeoutMS = 15000
+       |		consumerGroupId = "Consumer"
+       |		fetchSize = 1048586
+       |		transactionZKServers = "localhost"
+       |		transactionZKPort = 2181
+       |		transactionZKRoot = "/consumers"
+       |		transactionStateUpdateMS = 2000
+       |		deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+       |	}
+       |
+       |	kafkaSink.metricStore {}
+       |
+       |	alert.alert {
+       |		executor = "alertExecutor"
+       |	}
+       |
+       |	aggregator.aggreator {
+       |		executor = "aggreationExecutor"
+       |	}
+       |
+       |	metric_event_1|metric_event_2 -> alert {}
+       |	metric_event_1|metric_event_2 -> metricStore {}
+       |}
+     """.stripMargin
+
+  DataFlow.getClass.toString should "parse dataflow end-to-end correctly" in {
+    val config = ConfigFactory.parseString(dataFlowConfig)
+    config should not be null
+    val dataflow = DataFlow.parse(config)
+    dataflow should not be null
+    dataflow.getConnectors.size should be(4)
+    dataflow.getProcessors.size should be(5)
+  }
+
+  Identifier.getClass.toString should "parse as definition" in {
+    val defId = Identifier.parse("kafka").asInstanceOf[DefinitionIdentifier]
+    defId.moduleType should be("kafka")
+  }
+
+  Identifier.getClass.toString should "parse node1 -> node2 as connection" in {
+    val id = Identifier.parse("node1 -> node2").asInstanceOf[ConnectionIdentifier]
+    id.fromIds.size should be(1)
+  }
+
+  Identifier.getClass.toString should "parse node1|node2 -> node3" in {
+    val id = Identifier.parse("node1|node2 -> node3").asInstanceOf[ConnectionIdentifier]
+    id.fromIds.size should be(2)
+  }
+
+  Identifier.getClass.toString should "parse node1|node2|node3 -> node4 as connection" in {
+    val id = Identifier.parse("node1|node2|node3 -> node4").asInstanceOf[ConnectionIdentifier]
+    id.fromIds.size should be(3)
+  }
+
+  Identifier.getClass.toString should "parse node1 | node2 | node3 -> node4 as connection" in {
+    val id = Identifier.parse("node1 | node2 | node3 -> node4").asInstanceOf[ConnectionIdentifier]
+    id.fromIds.size should be(3)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
new file mode 100644
index 0000000..c87475b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline
+
+import org.apache.eagle.datastream.ExecutionEnvironments.storm
+import org.scalatest.{FlatSpec, Matchers}
+
+class PipelineSpec extends FlatSpec with Matchers{
+  "Pipeline" should "parse successfully from pipeline_1.conf" in {
+    val pipeline = Pipeline.parseResource("pipeline_1.conf")
+    pipeline should not be null
+  }
+
+  "Pipeline" should "compile successfully from pipeline_2.conf" in {
+    val pipeline = Pipeline.parseResource("pipeline_2.conf")
+    pipeline should not be null
+    val stream = Pipeline.compile(pipeline)
+    stream should not be null
+    // Throw ClassNotFoundException when submit in unit test
+    // stream.submit[storm]
+  }
+}
+
+/**
+ * Storm LocalCluster throws ClassNotFoundException when submit in unit test, so here submit in App
+ */
+object PipelineSpec_2 extends App{
+  val pipeline = Pipeline(args).parseResource("pipeline_2.conf")
+  val stream = Pipeline.compile(pipeline)
+  stream.submit[storm]
+}
+
+object PipelineSpec_3 extends App {
+  Pipeline(args).submit[storm]("pipeline_3.conf")
+}
+
+object PipelineSpec_4 extends App {
+  Pipeline(args).submit[storm]("pipeline_4.conf")
+}
+
+object PipelineCLISpec extends App{
+  Pipeline.main(args)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
index e66d3fb..48c832a 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
@@ -58,11 +58,8 @@ public class AggregateExecutorFactory {
 	public IPolicyExecutor[] createExecutors(Config config, List<String> streamNames, String executorId) throws Exception {
 		StringBuilder partitionerCls = new StringBuilder(DefaultPolicyPartitioner.class.getCanonicalName());
         int numPartitions = loadExecutorConfig(config, executorId, partitionerCls);
-        
-		PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao = new PolicyDefinitionEntityDAOImpl<AggregateDefinitionAPIEntity>(
+		PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao = new PolicyDefinitionEntityDAOImpl<>(
 				new EagleServiceConnector(config), Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME);
-		
-		
 		return newAggregateExecutors(policyDefDao, streamNames, executorId, numPartitions, partitionerCls.toString());
 	}
 	
@@ -86,7 +83,6 @@ public class AggregateExecutorFactory {
         return numPartitions;
 	}
 
-
 //	private List<String> findStreamNames(Config config, String executorId, String dataSource) throws Exception {
 //		// Get map from alertExecutorId to alert stream
 //		// (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
@@ -104,12 +100,12 @@ public class AggregateExecutorFactory {
 	private AggregateExecutor[] newAggregateExecutors(PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefDAO,
 			List<String> sourceStreams, String executorID, int numPartitions, String partitionerCls)
 					throws Exception {
-		LOG.info("Creating alert executors with executorID: " + executorID + ", numPartitions: "
+		LOG.info("Creating aggregator executors with executorID: " + executorID + ", numPartitions: "
 				+ numPartitions + ", Partition class is: " + partitionerCls);
 
 		PolicyPartitioner partitioner = (PolicyPartitioner) Class.forName(partitionerCls).newInstance();
 		AggregateExecutor[] alertExecutors = new AggregateExecutor[numPartitions];
-		String[] _sourceStreams = sourceStreams.toArray(new String[0]);
+		String[] _sourceStreams = sourceStreams.toArray(new String[sourceStreams.size()]);
 
 		for (int i = 0; i < numPartitions; i++) {
 			alertExecutors[i] = new AggregateExecutor(executorID, partitioner, numPartitions, i, alertDefDAO,
@@ -117,5 +113,4 @@ public class AggregateExecutorFactory {
 		}
 		return alertExecutors;
 	}
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
index 8c01935..e42fc48 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.dataproc.impl.aggregate;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.dataproc.core.ValuesArray;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Created on 1/10/16.
@@ -68,10 +70,13 @@ public class SimpleAggregateExecutor
         aggDef.getTags().put(Constants.POLICY_TYPE, policyType);
         // TODO make it more general, not only hard code siddhi cep support here.
         try {
-            String template = "{\"type\":\"siddhiCEPEngine\", \"expression\":\"%s\", \"containsDefintion\": true }";
-            aggDef.setPolicyDef(String.format(template, this.cql));
+            Map<String,Object> template = new HashMap<>();
+            template.put("type","siddhiCEPEngine");
+            template.put("expression",this.cql);
+            template.put("containsDefinition",true);
+            aggDef.setPolicyDef(new ObjectMapper().writer().writeValueAsString(template));
         } catch (Exception e) {
-            LOG.error("simple aggregate generate policy definition failed!", e);
+            LOG.error("Simple aggregate generate policy definition failed!", e);
         }
         aggDef.setCreatedTime(new Date().getTime());
         aggDef.setLastModifiedDate(new Date().getTime());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
new file mode 100644
index 0000000..416aaa3
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class JsonSerializer implements Serializer<Object> {
+    private final StringSerializer stringSerializer = new StringSerializer();
+    private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
+    private static final ObjectMapper om = new ObjectMapper();
+
+    static {
+        om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        stringSerializer.configure(configs,isKey);
+    }
+
+    @Override
+    public byte[] serialize(String topic, Object data) {
+        String str = null;
+        try {
+            str = om.writeValueAsString(data);
+        } catch (IOException e) {
+            logger.error("Kafka serialization for send error!", e);
+        }
+        return stringSerializer.serialize(topic, str);
+    }
+
+    @Override
+    public void close() {
+        stringSerializer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
index 53537d7..0107a9b 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
@@ -37,26 +37,36 @@ public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
 		return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context));
 	}
 
+    private String configPrefix = "dataSourceConfig";
+
+    public KafkaSourcedSpoutProvider(){}
+
+    public KafkaSourcedSpoutProvider(String prefix){
+        this.configPrefix = prefix;
+    }
+
 	@Override
-	public BaseRichSpout getSpout(Config context){
+	public BaseRichSpout getSpout(Config config){
+        Config context = config;
+        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
 		// Kafka topic
-		String topic = context.getString("dataSourceConfig.topic");
+		String topic = context.getString("topic");
 		// Kafka consumer group id
-		String groupId = context.getString("dataSourceConfig.consumerGroupId");
+		String groupId = context.getString("consumerGroupId");
 		// Kafka fetch size
-		int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+		int fetchSize = context.getInt("fetchSize");
 		// Kafka deserializer class
-		String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+		String deserClsName = context.getString("deserializerClass");
 		// Kafka broker zk connection
-		String zkConnString = context.getString("dataSourceConfig.zkConnection");
+		String zkConnString = context.getString("zkConnection");
 		// transaction zkRoot
-		String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+		String zkRoot = context.getString("transactionZKRoot");
 
         LOG.info(String.format("Use topic id: %s",topic));
 
         String brokerZkPath = null;
-        if(context.hasPath("dataSourceConfig.brokerZkPath")) {
-            brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
+        if(context.hasPath("brokerZkPath")) {
+            brokerZkPath = context.getString("brokerZkPath");
         }
 
         BrokerHosts hosts;
@@ -72,20 +82,20 @@ public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
 				groupId);
 		
 		// transaction zkServers
-		spoutConfig.zkServers = Arrays.asList(context.getString("dataSourceConfig.transactionZKServers").split(","));
+		spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
 		// transaction zkPort
-		spoutConfig.zkPort = context.getInt("dataSourceConfig.transactionZKPort");
+		spoutConfig.zkPort = context.getInt("transactionZKPort");
 		// transaction update interval
-		spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+		spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
 		// Kafka fetch size
 		spoutConfig.fetchSizeBytes = fetchSize;		
 		// "startOffsetTime" is for test usage, prod should not use this
-		if (context.hasPath("dataSourceConfig.startOffsetTime")) {
-			spoutConfig.startOffsetTime = context.getInt("dataSourceConfig.startOffsetTime");
+		if (context.hasPath("startOffsetTime")) {
+			spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
 		}		
 		// "forceFromStart" is for test usage, prod should not use this 
-		if (context.hasPath("dataSourceConfig.forceFromStart")) {
-			spoutConfig.forceFromStart = context.getBoolean("dataSourceConfig.forceFromStart");
+		if (context.hasPath("forceFromStart")) {
+			spoutConfig.forceFromStart = context.getBoolean("forceFromStart");
 		}
 		
 		spoutConfig.scheme = getStreamScheme(deserClsName, context);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
index faeb7c3..15401fd 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
@@ -38,7 +38,7 @@ public class KafkaSourcedSpoutScheme implements Scheme {
 	public KafkaSourcedSpoutScheme(String deserClsName, Config context){
 		try{
 			Properties prop = new Properties();
-            if(context.getObject("eagleProps") != null) {
+            if(context.hasPath("eagleProps")) {
                 prop.putAll(context.getObject("eagleProps"));
             }
 			Constructor<?> constructor =  Class.forName(deserClsName).getConstructor(Properties.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
deleted file mode 100644
index 7e5f271..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-import scala.reflect.runtime.universe._
-
-/**
- * Execution environment factory
- *
- * The factory is mainly used for create or manage execution environment,
- * and also handles the shared works like configuration, arguments for execution environment
- *
- * Notice: this factory class should not know any implementation like storm or spark
- *
- * @since 0.3.0
- */
-object ExecutionEnvironments{
-  /**
-   * Use `'''get[StormExecutionEnvironment](config)'''` instead
-   *
-   * @param config
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(config : Config) = new StormExecutionEnvironment(config)
-
-  /**
-   * Use `'''get[StormExecutionEnvironment]'''` instead
-   *
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm:StormExecutionEnvironment = {
-    val config = ConfigFactory.load()
-    getStorm(config)
-  }
-
-  /**
-   * Use `'''get[StormExecutionEnvironment](args)'''` instead
-   *
-   * @see get[StormExecutionEnvironment](args)
-   *
-   * @param args
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(args:Array[String]):StormExecutionEnvironment = {
-    getStorm(new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
-    get[T](ConfigFactory.load())
-  }
-
-  /**
-   *
-   * @param config
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={
-    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
-  }
-
-  /**
-   *
-   * @param args
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={
-    get[T](new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * Support java style for default config
-   *
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
-    get[T](ConfigFactory.load(),clazz)
-  }
-
-  /**
-   * Support java style
-   * @param config command config
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(config)
-  }
-
-  /**
-   * Support java style
-   *
-   * @param args command arguments in string array
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
new file mode 100644
index 0000000..90e59cf
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+
+import scala.reflect.runtime.universe._
+
+/**
+ * Execution environment factory
+ *
+ * The factory is mainly used for create or manage execution environment,
+ * and also handles the shared works like configuration, arguments for execution environment
+ *
+ * Notice: this factory class should not know any implementation like storm or spark
+ *
+ * @since 0.3.0
+ */
+object ExecutionEnvironments{
+  type storm = StormExecutionEnvironment
+
+  /**
+   * Use `'''get[StormExecutionEnvironment](config)'''` instead
+   *
+   * @param config
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm(config : Config) = new StormExecutionEnvironment(config)
+
+  /**
+   * Use `'''get[StormExecutionEnvironment]'''` instead
+   *
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm:StormExecutionEnvironment = {
+    val config = ConfigFactory.load()
+    getStorm(config)
+  }
+
+  /**
+   * Use `'''get[StormExecutionEnvironment](args)'''` instead
+   *
+   * @see get[StormExecutionEnvironment](args)
+    * @param args
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm(args:Array[String]):StormExecutionEnvironment = {
+    getStorm(new ConfigOptionParser().load(args))
+  }
+
+  /**
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
+    getWithConfig[T](ConfigFactory.load())
+  }
+
+  /**
+   *
+   * @param config
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def getWithConfig[T <: ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={
+    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
+  }
+
+  /**
+   *
+   * @param args
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={
+    getWithConfig[T](new ConfigOptionParser().load(args))
+  }
+
+  /**
+   * Support java style for default config
+   *
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
+    get[T](ConfigFactory.load(),clazz)
+  }
+
+  def get[T<:ExecutionEnvironment](clazz:Class[T], config:Config):T ={
+    get[T](config,clazz)
+  }
+
+  /**
+   * Support java style
+    *
+    * @param config command config
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
+    clazz.getConstructor(classOf[Config]).newInstance(config)
+  }
+
+  /**
+   * Support java style
+   *
+   * @param args command arguments in string array
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
+    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
index b189c57..c511484 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
@@ -19,64 +19,27 @@
 package org.apache.eagle.datastream.core
 
 import com.typesafe.config.Config
-import org.apache.eagle.datastream.utils.GraphPrinter
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+trait StreamContextAdapter{
+  def submit(context:StreamContext):Unit = {
+    execute(context.build)
+  }
+  def execute(dag: StreamDAG)
+}
 
 /**
+ * TODO: Decouple execution environment with stream context
+ *
  * @since 0.3.0
  */
-trait ExecutionEnvironment {
-  def config:Configuration
-
-  /**
-   * Business logic DAG
-   * @return
-   */
-  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
-
+abstract class ExecutionEnvironment(private val conf:Config)
+  extends StreamContext(conf) with StreamContextAdapter     // Continue to support old API
+  with StreamSourceBuilder
+{
   /**
    * Start to execute
    */
-  def execute():Unit
-
-  /**
-   * Support Java Style Config
-   *
-   * @return
-   */
-  def getConfig:Config = config.get
-}
-
-/**
- * @todo Use Configuration instead of Config
- *
- * @param conf
- */
-abstract class ExecutionEnvironmentBase(private val conf:Config)  extends ExecutionEnvironment with StreamSourceBuilder {
-  implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]])
-  private val _config:Configuration = Configuration(conf)
-
-  override def dag = _dag
-  override def config = _config
-
-  override def execute(): Unit = {
-    implicit val i_conf = _config.get
-    StreamNameExpansion()
-    GraphPrinter.print(dag,message="Before expanded DAG ")
-    StreamAggregateExpansion()
-    GraphPrinter.print(dag,message="after analyze expanded DAG ")
-    StreamAlertExpansion()
-    StreamUnionExpansion()
-    StreamGroupbyExpansion()
-    StreamParallelismConfigExpansion()
-    StreamNameExpansion()
-    GraphPrinter.print(dag,message="After expanded DAG ")
-
-    GraphPrinter.printDotDigraph(dag)
-
-    val streamDAG = StreamDAGTransformer.transform(dag)
-    execute(streamDAG)
+  def execute():Unit = {
+    submit(this)
   }
-
-  protected def execute(dag: StreamDAG)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
index a95001b..ffb4b9e 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
@@ -42,7 +42,6 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
          */
         val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
 
-
         val analyzeExecutors = if (cepQl != null) {
           AggregateExecutorFactory.Instance.createExecutors(cepQl)
         } else {
@@ -50,7 +49,7 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
         }
 
         analyzeExecutors.foreach(exec => {
-          val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).nameAs(exec.getExecutorId() + "_" + exec.getPartitionSeq()).initWith(dag,config, hook = false)
+          val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).initWith(dag,config, hook = false).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).stream(child.stream)
 
           // connect with previous
           if (strategy == null) {
@@ -70,7 +69,6 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
       case _ => 
     }
   }
-  
 }
 
 object StreamAggregateExpansion{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
index c731ac9..1ef57cc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -54,6 +54,7 @@ import com.typesafe.config.Config
 
 case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
   val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
+  import StreamAlertExpansion._
 
   override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): Unit ={
     val iter = dag.iterator()
@@ -78,7 +79,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
   def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], 
                dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = {
     child match {
-      case AlertStreamSink(upStreamNames, alertExecutorId, withConsumer,strategy) => {
+      case AlertStreamProducer(upStreamNames, alertExecutorId, withConsumer,strategy) => {
         /**
          * step 1: wrapper previous StreamProducer with one more field "streamName"
          * for AlertStreamSink, we check previous StreamProducer and replace that
@@ -114,7 +115,10 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
     }
   }
 
-  protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = {/**
+  protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = {
+    if(upStreamNames == null) throw new NullPointerException("upStreamNames is null")
+
+    /**
      * step 1: wrapper previous StreamProducer with one more field "streamName"
      * for AlertStreamSink, we check previous StreamProducer and replace that
      */
@@ -129,20 +133,22 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
           i += 1
         })
       }
-      case _: FlatMapProducer[AnyRef, AnyRef] => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+      case p: FlatMapProducer[AnyRef, AnyRef] =>
+        if(upStreamNames.size()>1) throw new IllegalStateException("More than 1 upStreamNames "+upStreamNames+" found for "+p){
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames))
       }
-      case _: MapperProducer[AnyRef,AnyRef] => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+      case p: MapperProducer[AnyRef,AnyRef] => {
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames))
       }
       case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(s,upStreamNames))
       }
       case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
     }
     newStreamProducers
   }
 
+
   protected def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
                       dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= {
     var newsp: StreamProducer[Any] = null
@@ -152,11 +158,11 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
         mapper match {
           case a: JavaStormStreamExecutor[EagleTuple] => {
             val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false)
+            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
           }
           case b: StormStreamExecutor[EagleTuple] => {
             val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false)
+            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
           }
           case _ => throw new IllegalArgumentException
         }
@@ -176,8 +182,8 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
           }
         }
         current match {
-          case MapperProducer(2, fn) => newsp = MapperProducer(3, newfun)
-          case _ => throw new IllegalArgumentException
+          case MapperProducer(_, fn) => newsp = MapperProducer(3, newfun).initWith(dag,config,hook = false).stream(current.stream)
+          case _ => throw new IllegalArgumentException(s"Illegal producer $current")
         }
         val incomingEdges = dag.incomingEdgesOf(current)
         incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
@@ -196,7 +202,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
             }
           }
         }
-        newsp = MapperProducer(3,fn)
+        newsp = MapperProducer(3,fn).initWith(dag,config,hook = false).stream(s.stream)
         toBeAddedEdges += StreamConnector(current,newsp)
         val outgoingEdges = dag.outgoingEdgesOf(current)
         outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to))
@@ -213,6 +219,35 @@ object StreamAlertExpansion{
     e.expand(dag)
     e
   }
+
+  /**
+    * Try upStreamNames firstly, otherwise try producer.streamId
+    *
+    * @param producer
+    * @param upStreamNames
+    * @return
+    */
+  private def recognizeSingleStreamName(producer: StreamProducer[AnyRef],upStreamNames:util.List[String]):String = {
+    if(upStreamNames == null){
+      producer.streamId
+    }else if(upStreamNames.size()>1){
+      if(producer.streamId == null) {
+        if (upStreamNames.size() > 1)
+          throw new IllegalStateException("Too many (more than 1) upStreamNames " + upStreamNames + " given for " + producer)
+        else
+          upStreamNames.get(0)
+      } else {
+        producer.streamId
+      }
+    } else if(upStreamNames.size() == 1){
+      upStreamNames.get(0)
+    }else {
+      if(producer.streamId == null){
+        throw new IllegalArgumentException("No stream name found for "+producer)
+      } else
+        producer.streamId
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
new file mode 100644
index 0000000..6e21bcc
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+import org.apache.eagle.datastream.ExecutionEnvironments
+import org.apache.eagle.datastream.utils.GraphPrinter
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.reflect.runtime.universe._
+
+trait StreamContextBuilder extends StreamSourceBuilder {
+  def config:Configuration
+  /**
+   * Business logic DAG
+   * @return
+   */
+  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
+  /**
+   * Support Java Style Config
+   *
+   * @return
+   */
+  def getConfig:Config = config.get
+  def build:StreamDAG
+  def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit
+  def submit(env:ExecutionEnvironment):Unit
+  def submit(clazz:Class[ExecutionEnvironment]):Unit
+}
+
+class StreamContext(private val conf:Config) extends StreamContextBuilder{
+  implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]])
+  private val _config:Configuration = Configuration(conf)
+  override def dag = _dag
+  override def config = _config
+  override def build: StreamDAG = {
+    implicit val i_conf = _config.get
+    StreamNameExpansion()
+    GraphPrinter.print(dag,message="Before expanded DAG ")
+    StreamAggregateExpansion()
+    StreamAlertExpansion()
+    StreamUnionExpansion()
+    StreamGroupbyExpansion()
+    StreamParallelismConfigExpansion()
+    StreamNameExpansion()
+    GraphPrinter.print(dag,message="After expanded DAG ")
+    GraphPrinter.printDotDigraph(dag)
+    StreamDAGTransformer.transform(dag)
+  }
+
+  override def submit(env: ExecutionEnvironment): Unit = {
+    env.submit(this)
+  }
+
+  override def submit(clazz: Class[ExecutionEnvironment]): Unit = {
+    ExecutionEnvironments.get(clazz,conf).submit(this)
+  }
+
+  override def submit[E <: ExecutionEnvironment](implicit typeTag: TypeTag[E]): Unit = {
+    ExecutionEnvironments.getWithConfig[E](conf).submit(this)
+  }
+}
+
+object StreamContext {
+  /**
+   * @return
+   */
+  def apply():StreamContext = {
+    new StreamContext(ConfigFactory.load())
+  }
+
+  /**
+   *
+   * @param args
+   * @return
+   */
+  def apply(args:Array[String]):StreamContext ={
+    new StreamContext(new ConfigOptionParser().load(args))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
index 255f031..7845740 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
@@ -27,7 +27,8 @@ import scala.collection.{JavaConversions, mutable}
  * wrapper of DAG, used for storm topology compiler
  */
 class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) extends StreamProducerGraph {
-  var nodeMap: mutable.Map[String, StreamProducer[Any]] = null
+  var nodeMap: mutable.Map[String, StreamProducer[Any]] = mutable.Map[String,StreamProducer[Any]]()
+  graph.iterator().asScala.foreach(p=> nodeMap.put(p.name,p))
 
   override def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]): Unit = {
     graph.addEdge(from, to, streamConnector)
@@ -35,6 +36,7 @@ class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConne
 
   override def addVertex(producer: StreamProducer[Any]): Unit = {
     graph.addVertex(producer)
+    nodeMap.put(producer.name,producer)
   }
 
   override def iterator(): Iterator[StreamProducer[Any]] = {
@@ -65,4 +67,4 @@ class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConne
     graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e))
     set
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
index 32947b9..6b20bf2 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
@@ -31,6 +31,7 @@ object StreamDAGTransformer {
    * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
    * @return StormStreamDAG
    */
+  @deprecated("Use StreamDAG(dag) will transform directly")
   def transform(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) : StreamDAG = {
     val stormDAG = new StreamDAG(dag)
     val nodeMap = mutable.HashMap[String, StreamProducer[Any]]()

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
index e01ffa4..8699da6 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
@@ -45,9 +45,13 @@ case class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExp
   }
 
   private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
-    val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
-    parallelismConfig.asScala.toMap map {
-      case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
+    if(config.hasPath("envContextConfig.parallelismConfig")) {
+      val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
+      parallelismConfig.asScala.toMap map {
+        case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
+      }
+    }else{
+      Map[Pattern,Int]()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
index 4d81424..18acd9c 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
@@ -1,20 +1,18 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.eagle.datastream.core
 
@@ -24,7 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
 import org.apache.eagle.alert.entity.AlertAPIEntity
-import org.apache.eagle.datastream.FlatMapper
+import org.apache.eagle.datastream.{FlatMapperWrapper, Collector, FlatMapper}
 import org.apache.eagle.partition.PartitionStrategy
 import org.apache.eagle.policy.common.Constants
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
@@ -73,48 +71,54 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
 
   override def filter(fn : T => Boolean): StreamProducer[T] ={
     val ret = FilterProducer[T](fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] = {
     val ret = FlatMapProducer[T,R](flatMapper)
-    hookup(this, ret)
+    connect(this, ret)
+    ret
+  }
+  override def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R] = {
+    val ret = FlatMapProducer[T,R](FlatMapperWrapper[R](func))
+    connect(this, ret)
     ret
   }
 
+
   override def foreach(fn : T => Unit) : Unit = {
     val ret = ForeachProducer[T](fn)
-    hookup(this, ret)
+    connect(this, ret)
   }
 
   override def map[R](fn : T => R) : StreamProducer[R] = {
     val ret = MapperProducer[T,R](0,fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def map1[R](fn : T => R): StreamProducer[R] = {
     val ret = MapperProducer[T,R](1, fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def map2[R](fn : T => R): StreamProducer[R] = {
     val ret = MapperProducer[T,R](2, fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def map3[R](fn : T => R): StreamProducer[R] = {
     val ret = MapperProducer(3, fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def map4[R](fn : T => R): StreamProducer[R] = {
     val ret = MapperProducer(4, fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
@@ -125,7 +129,15 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     // validate each field index is greater or equal to 0
     fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
     val ret = GroupByFieldProducer[T](fields)
-    hookup(this, ret)
+    connect(this, ret)
+    ret
+  }
+
+  def groupByFieldIndex(fields : Seq[Int]) : StreamProducer[T] = {
+    // validate each field index is greater or equal to 0
+    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
+    val ret = GroupByFieldProducer[T](fields)
+    connect(this, ret)
     ret
   }
 
@@ -134,19 +146,19 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     // validate each field index is greater or equal to 0
     fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
     val ret = GroupByFieldProducer[T](fields.asScala.toSeq.asInstanceOf[Seq[Int]])
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def groupByKey(keySelector: T=> Any):StreamProducer[T] = {
     val ret = GroupByKeyProducer(keySelector)
-    hookup(this,ret)
+    connect(this,ret)
     ret
   }
 
   override def streamUnion[T2,T3](others : Seq[StreamProducer[T2]]) : StreamProducer[T3] = {
     val ret = StreamUnionProducer[T, T2, T3](others)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
@@ -158,7 +170,7 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
 
   override def groupBy(strategy : PartitionStrategy) : StreamProducer[T] = {
     val ret = GroupByStrategyProducer(strategy)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
@@ -173,9 +185,10 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     alert(upStreamNames.asScala, alertExecutorId, consume = false)
   }
 
-  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null) = {
-    val ret = AlertStreamSink(upStreamNames, alertExecutorId, consume, strategy)
-    hookup(this, ret)
+  override def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null):AlertStreamProducer = {
+    val ret = AlertStreamProducer(upStreamNames, alertExecutorId, consume, strategy)
+    connect(this, ret)
+    ret
   }
 
   def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
@@ -196,28 +209,36 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
 
   def aggregate(upStreamNames: java.util.List[String], queryExecutorId : String, strategy: PartitionStrategy = null): StreamProducer[T] = {
     val ret= AggregateProducer(upStreamNames, queryExecutorId, null, strategy)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   def aggregate(cql : String, strategy: PartitionStrategy): StreamProducer[T] = {
     val ret= AggregateProducer(util.Arrays.asList(Constants.EAGLE_DEFAULT_POLICY_NAME), null, cql, strategy)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
   
   def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T] = {
     val ret = PersistProducer(executorId, storageType)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
-  protected def hookup[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
+  def connect[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
+    if(current.graph == null) throw new NullPointerException(s"$current has not been registered to any graph before being connected")
     current.graph.addVertex(next)
     current.graph.addEdge(current, next, StreamConnector(current, next))
     passOnContext[T1,T2](current, next)
   }
 
+  def connect[T2]( next: StreamProducer[T2]) = {
+    if(this.graph == null) throw new NullPointerException("graph is null")
+    this.graph.addVertex(next)
+    this.graph.addEdge(this, next, StreamConnector(this, next))
+    passOnContext[T,T2](this, next)
+  }
+
   private def passOnContext[T1 ,T2](current: StreamProducer[T1], next: StreamProducer[T2]): Unit ={
     next.initWith(current.graph,current.config)
   }
@@ -288,16 +309,21 @@ case class StormSourceProducer[T](source: BaseRichSpout) extends StreamProducer[
   }
 }
 
-case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends StreamProducer[T]
+case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends StreamProducer[T]{
+  override def toString: String = s"IterableStreamProducer(${iterable.getClass.getSimpleName}))"
+}
+case class IteratorStreamProducer[T](iterator: Iterator[T]) extends StreamProducer[T]{
+  override def toString: String = s"IteratorStreamProducer(${iterator.getClass.getSimpleName})"
+}
 
-case class AlertStreamSink(upStreamNames: util.List[String], alertExecutorId : String, var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity] {
-  def consume(consume: Boolean): AlertStreamSink = {
+case class AlertStreamProducer(var upStreamNames: util.List[String], alertExecutorId : String, var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity] {
+  def consume(consume: Boolean): AlertStreamProducer = {
     this.consume = consume
     this
   }
 }
 
-case class AggregateProducer[T](upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T]
+case class AggregateProducer[T](var upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T]
 
 case class PersistProducer[T](executorId :String, storageType: StorageType.StorageType) extends StreamProducer[T]
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
index 3ed067d..1330f06 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
@@ -19,12 +19,10 @@ package org.apache.eagle.datastream.core
 
 import com.typesafe.config.Config
 import org.apache.commons.lang3.builder.HashCodeBuilder
-import org.apache.eagle.datastream.FlatMapper
+import org.apache.eagle.datastream.{Collector, FlatMapper}
 import org.apache.eagle.partition.PartitionStrategy
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
 
-import scala.reflect.runtime.{universe => ru}
-
 /**
  * StreamInfo should be fully serializable and having not runtime type information
  */
@@ -39,7 +37,11 @@ class StreamInfo  extends Serializable{
    */
   var name: String = null
 
+  /**
+    * Output stream id, equals to name by default
+    */
   var streamId:String=null
+
   var parallelismNum: Int = 1
 
   /**
@@ -108,6 +110,7 @@ trait StreamProtocol[+T <: Any]{
    * @return
    */
   def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R]
+  def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R]
 
   /**
    *
@@ -152,7 +155,7 @@ trait StreamProtocol[+T <: Any]{
   def groupByKey(keyer:T => Any):StreamProducer[T]
 
   def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3]
-  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy)
+  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy):AlertStreamProducer
 
   def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T]
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
index 9884e88..5f3bd22 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
@@ -44,4 +44,20 @@ trait StreamSourceBuilder {
     p.initWith(dag,config.get)
     p
   }
+
+  def from[T:ru.TypeTag](iterator: Iterator[T],recycle:Boolean):IteratorStreamProducer[T]={
+    val p = IteratorStreamProducer[T](iterator)
+    p.initWith(dag,config.get)
+    p
+  }
+
+  def from(product: Product):IteratorStreamProducer[Any]={
+    val p = IteratorStreamProducer[Any](product.productIterator)
+    p.initWith(dag,config.get)
+    p
+  }
+
+  def register[T](producer:StreamProducer[T]):Unit = {
+    producer.initWith(dag,config.get)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
index e109118..64b5f0f 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory
  * @param streamInfo
  * @tparam T
  */
-abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{
+abstract class AbstractStreamBolt[T](val fieldsNum:Int=1, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{
   private var _collector: OutputCollector = null
   private val LOG = LoggerFactory.getLogger(classOf[AbstractStreamBolt[T]])
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
index e3b6e25..c64ea83 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
@@ -52,10 +52,10 @@ case class IterableStreamSpout(iterable: Iterable[Any],recycle:Boolean = true)(i
         _collector.emit(List(current).asJava)
       }
     }else if(recycle){
-      LOG.info("Recycling the iterator")
+      if(LOG.isDebugEnabled) LOG.debug("Recycling the iterator")
       _iterator = iterable.iterator
     }else{
-      LOG.info("No tuple left, sleep forever")
+      if(LOG.isDebugEnabled) LOG.debug("No tuple left, sleep forever")
       this.deactivate()
       Utils.sleep(Long.MaxValue)
     }
@@ -65,7 +65,7 @@ case class IterableStreamSpout(iterable: Iterable[Any],recycle:Boolean = true)(i
     if(info.outKeyed) {
       declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
     }else{
-      declarer.declare(new Fields(NameConstants.FIELD_PREFIX))
+      declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0"))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
new file mode 100644
index 0000000..ea6d658
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.spout.SpoutOutputCollector
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichSpout
+import backtype.storm.tuple.Fields
+import backtype.storm.utils.Utils
+import org.apache.eagle.datastream.core.StreamInfo
+import org.apache.eagle.datastream.utils.NameConstants
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class IteratorStreamSpout(iterator: Iterator[Any])(implicit info:StreamInfo) extends BaseRichSpout {
+  val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout])
+  var _collector:SpoutOutputCollector=null
+  var _iterator:Iterator[Any] = null
+
+  override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
+    this._collector = collector
+    this._iterator = iterator
+  }
+
+  override def nextTuple(): Unit = {
+    if(_iterator.hasNext){
+      val current = _iterator.next().asInstanceOf[AnyRef]
+      if(info.outKeyed) {
+        _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]])
+      }else{
+        _collector.emit(List(current).asJava)
+      }
+    }else{
+      LOG.info("No tuple left, sleep forever")
+      this.deactivate()
+      Utils.sleep(Long.MaxValue)
+    }
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    if(info.outKeyed) {
+      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
+    }else{
+      declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0"))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
index 0001d2f..19305fa 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
@@ -33,11 +33,15 @@ case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDe
 
   override def deserialize(bytes: Array[Byte]): AnyRef = {
     var map: util.Map[String, _] = null
-    try {
-      map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
-    } catch {
-      case e: IOException => {
-        LOG.error("Failed to deserialize json from: " + new String(bytes), e)
+    if(bytes.length == 0 || bytes == null){
+     if(LOG.isDebugEnabled) LOG.warn("Skip empty message")
+    }else {
+      try {
+        map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
+      } catch {
+        case e: IOException => {
+          LOG.error("Failed to deserialize json from: " + new String(bytes), e)
+        }
       }
     }
     map

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
index 29b5cf4..42a030d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
@@ -35,9 +35,14 @@ object StormBoltFactory {
         }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){
           worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config)
           StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]])
-        }else {
-          throw new UnsupportedOperationException
+        }else if(worker.isInstanceOf[FlatMapperWrapper[Any]]){
+          StormFlatFunctionWrapper(worker.asInstanceOf[FlatMapperWrapper[Any]].func)
+        } else {
+          StormFlatMapperWrapper(worker)
         }
+//        else {
+//          throw new UnsupportedOperationException(s"Unsupported FlatMapperProducer type: $producer")
+//        }
       }
       case filter:FilterProducer[Any] => {
         FilterBoltWrapper(filter.fn)
@@ -49,8 +54,8 @@ object StormBoltFactory {
         ForeachBoltWrapper(foreach.fn)
       }
       case persist : PersistProducer[Any] => {
-        val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString());
-        persisExecutor.prepareConfig(config);
+        val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString)
+        persisExecutor.prepareConfig(config)
         JavaStormBoltWrapper(persisExecutor.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
       }
       case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
index 4e7d743..4165db4 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
@@ -19,15 +19,12 @@ package org.apache.eagle.datastream.storm
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
 import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider
-import org.apache.eagle.datastream.core.{ExecutionEnvironmentBase, StormSourceProducer, StreamDAG}
-
+import org.apache.eagle.datastream.core.{ExecutionEnvironment, StormSourceProducer, StreamDAG}
 
 /**
  * @since  12/7/15
  */
-case class StormExecutionEnvironment(private val conf:Config) extends ExecutionEnvironmentBase(conf){
-
-
+class StormExecutionEnvironment(private val conf:Config) extends ExecutionEnvironment(conf) {
   override def execute(dag: StreamDAG) : Unit = {
     StormTopologyCompiler(config.get, dag).buildTopology.execute
   }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
new file mode 100644
index 0000000..e5eea1f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.tuple.Tuple
+import org.apache.eagle.datastream.Collector
+import org.apache.eagle.datastream.core.StreamInfo
+
+case class StormFlatFunctionWrapper(flatMapper:(Any,Collector[Any])=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any]{
+  /**
+   * Handle keyed stream value
+   */
+  override def onKeyValue(key: Any, value: Any)(implicit input: Tuple): Unit = {
+    flatMapper(value,new Collector[Any] {
+      override def collect(r: Any): Unit = emit(r)(input)
+    })
+  }
+
+  /**
+   * Handle general stream values list
+   *
+   * @param values
+   */
+  override def onValues(values: util.List[AnyRef])(implicit input: Tuple): Unit = {
+    flatMapper(values,new Collector[Any] {
+      override def collect(r: Any): Unit = emit(r)(input)
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
new file mode 100644
index 0000000..e5fb86d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import backtype.storm.tuple.Tuple
+import org.apache.eagle.datastream.{Collector, FlatMapper}
+import org.apache.eagle.datastream.core.StreamInfo
+import scala.collection.JavaConverters._
+
+case class StormFlatMapperWrapper(flatMapper:FlatMapper[Any])(implicit info:StreamInfo) extends AbstractStreamBolt[Any]{
+  /**
+   * Handle keyed stream value
+   */
+  override def onKeyValue(key: Any, value: Any)(implicit input: Tuple): Unit = {
+    flatMapper.flatMap(value.asInstanceOf[Seq[AnyRef]],new Collector[Any] {
+      override def collect(r: Any): Unit = emit(r)(input)
+    })
+  }
+
+  /**
+   * Handle general stream values list
+   *
+   * @param values
+   */
+  override def onValues(values: java.util.List[AnyRef])(implicit input: Tuple): Unit = {
+    flatMapper.flatMap(values.asScala,new Collector[Any] {
+      override def collect(r: Any): Unit = emit(r)(input)
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
index 74ed11d..6a3b606 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
@@ -22,7 +22,7 @@ import java.util
 
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
-import org.apache.eagle.datastream.core.{IterableStreamProducer, StormSourceProducer, StreamProducer}
+import org.apache.eagle.datastream.core.{IteratorStreamProducer, IterableStreamProducer, StormSourceProducer, StreamProducer}
 import org.apache.eagle.datastream.utils.NameConstants
 
 object StormSpoutFactory {
@@ -37,6 +37,8 @@ object StormSpoutFactory {
         }
       case p@IterableStreamProducer(iterable,recycle) =>
         IterableStreamSpout(iterable,recycle)
+      case p@IteratorStreamProducer(iterator) =>
+        IteratorStreamSpout(iterator)
       case _ =>
         throw new IllegalArgumentException(s"Cannot compile unknown $from to a Storm Spout")
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
index 18f52cb..f572377 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
@@ -36,7 +36,7 @@ case class StormTopologyCompiler(config: Config, graph: StreamProducerGraph) ext
   val boltCache = scala.collection.mutable.Map[StreamProducer[Any], StormBoltWrapper]()
 
   override def buildTopology: AbstractTopologyExecutor ={
-    val builder = new TopologyBuilder();
+    val builder = new TopologyBuilder()
     val iter = graph.iterator()
     val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]()
     val stormTopologyGraph = ListBuffer[String]()



Mime
View raw message