eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [07/13] incubator-eagle git commit: EAGLE-341 clean inner process alert engine code clean inner process alert engine code
Date Sun, 14 Aug 2016 06:23:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
deleted file mode 100644
index 85b5334..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
+++ /dev/null
@@ -1,110 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		envContextConfig {
-			"env" : "storm"
-			"mode" : "cluster"
-			"topologyName" : "dynamical-topology-5"
-			"parallelismConfig" : {
-				"kafkaMsgConsumer" : 1
-			},
-			"nimbusHost":"sandbox.hortonworks.com",
-			"nimbusThriftPort":6627
-		}
-		alertExecutorConfigs {
-			defaultAlertExecutor  {
-				"parallelism" : 1
-				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-				"needValidation" : "true"
-			}
-		}
-		eagleProps {
-			"site" : "sandbox"
-			"application": "HADOOP"
-		}
-	}
-	
-	dataflow {
-		KafkaSource.JmxStreamOne {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "sandbox.hortonworks.com:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "sandbox.hortonworks.com"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamTwo {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "sandbox.hortonworks.com:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "sandbox.hortonworks.com"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamThree{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "sandbox.hortonworks.com:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "sandbox.hortonworks.com"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Console.printer {
-			format = "%s"
-		}
-
-		KafkaSink.metricStore {
-			"topic" = "metric_event_persist"
-			"bootstrap.servers" = "sandbox.hortonworks.com:6667"
-		}
-
-		Alert.defaultAlertExecutor {
-			// upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-			// alertExecutorId = defaultAlertExecutor
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
-			grouping = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
deleted file mode 100644
index 7b552da..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.eagle.stream.pipeline
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{FlatSpec, Matchers}
-
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-class ConfigSpec extends FlatSpec with Matchers{
-  "Config" should "be overrode correctly" in {
-    val conf1 = ConfigFactory.parseString(
-      """
-        |value=1
-      """.stripMargin)
-    val conf2 = ConfigFactory.parseString(
-      """
-        |value=2
-      """.stripMargin)
-    val conf3 = conf1.withFallback(conf2)
-    val conf4 = conf2.withFallback(conf1)
-    conf3.getInt("value") should be(1)
-    conf4.getInt("value") should be(2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
deleted file mode 100644
index e63280a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.stream.pipeline.parser.{ConnectionIdentifier, DataFlow, DefinitionIdentifier, Identifier}
-import org.scalatest.{FlatSpec, Matchers}
-
-class DataFlowSpec extends FlatSpec with Matchers {
-  val dataFlowConfig =
-    """
-       |{
-       |	kafkaSource.metric_event_1 {
-       |    schema {
-       |      metric: string
-       |      timestamp: long
-       |      value: double
-       |    }
-       |		parallism = 1000
-       |		topic = "metric_event_1"
-       |		zkConnection = "localhost:2181"
-       |		zkConnectionTimeoutMS = 15000
-       |		consumerGroupId = "Consumer"
-       |		fetchSize = 1048586
-       |		transactionZKServers = "localhost"
-       |		transactionZKPort = 2181
-       |		transactionZKRoot = "/consumers"
-       |		transactionStateUpdateMS = 2000
-       |		deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-       |	}
-       |
-       |	kafkaSource.metric_event_2 {
-       |		schema = {
-       |      metric: string
-       |      timestamp: long
-       |      value: double
-       |    }
-       |		parallism = 1000
-       |		topic = "metric_event_2"
-       |		zkConnection = "localhost:2181"
-       |		zkConnectionTimeoutMS = 15000
-       |		consumerGroupId = "Consumer"
-       |		fetchSize = 1048586
-       |		transactionZKServers = "localhost"
-       |		transactionZKPort = 2181
-       |		transactionZKRoot = "/consumers"
-       |		transactionStateUpdateMS = 2000
-       |		deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-       |	}
-       |
-       |	kafkaSink.metricStore {}
-       |
-       |	alert.alert {
-       |		executor = "alertExecutor"
-       |	}
-       |
-       |	aggregator.aggreator {
-       |		executor = "aggreationExecutor"
-       |	}
-       |
-       |	metric_event_1|metric_event_2 -> alert {}
-       |	metric_event_1|metric_event_2 -> metricStore {}
-       |}
-     """.stripMargin
-
-  DataFlow.getClass.toString should "parse dataflow end-to-end correctly" in {
-    val config = ConfigFactory.parseString(dataFlowConfig)
-    config should not be null
-    val dataflow = DataFlow.parse(config)
-    dataflow should not be null
-    dataflow.getConnectors.size should be(4)
-    dataflow.getProcessors.size should be(5)
-  }
-
-  Identifier.getClass.toString should "parse as definition" in {
-    val defId = Identifier.parse("kafka").asInstanceOf[DefinitionIdentifier]
-    defId.moduleType should be("kafka")
-  }
-
-  Identifier.getClass.toString should "parse node1 -> node2 as connection" in {
-    val id = Identifier.parse("node1 -> node2").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(1)
-  }
-
-  Identifier.getClass.toString should "parse node1|node2 -> node3" in {
-    val id = Identifier.parse("node1|node2 -> node3").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(2)
-  }
-
-  Identifier.getClass.toString should "parse node1|node2|node3 -> node4 as connection" in {
-    val id = Identifier.parse("node1|node2|node3 -> node4").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(3)
-  }
-
-  Identifier.getClass.toString should "parse node1 | node2 | node3 -> node4 as connection" in {
-    val id = Identifier.parse("node1 | node2 | node3 -> node4").asInstanceOf[ConnectionIdentifier]
-    id.fromIds.size should be(3)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
deleted file mode 100644
index 5e2007d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline
-
-import org.apache.eagle.datastream.ExecutionEnvironments.storm
-import org.scalatest.{FlatSpec, Matchers}
-
-class PipelineSpec extends FlatSpec with Matchers{
-  "Pipeline" should "parse successfully from pipeline_1.conf" in {
-    val pipeline = Pipeline.parseResource("pipeline_1.conf")
-    pipeline should not be null
-  }
-
-  "Pipeline" should "compile successfully from pipeline_2.conf" in {
-    val pipeline = Pipeline.parseResource("pipeline_2.conf")
-    pipeline should not be null
-    val stream = Pipeline.compile(pipeline)
-    stream should not be null
-    // Throw ClassNotFoundException when submit in unit test
-    // stream.submit[storm]
-  }
-}
-
-/**
- * Storm LocalCluster throws ClassNotFoundException when submit in unit test, so here submit in App
- */
-object PipelineSpec_2 extends App{
-  val pipeline = Pipeline(args).parseResource("pipeline_2.conf")
-  val stream = Pipeline.compile(pipeline)
-  stream.submit[storm]
-}
-
-object PipelineSpec_3 extends App {
-  Pipeline(args).submit[storm]("pipeline_3.conf")
-}
-
-object PipelineSpec_4 extends App {
-  Pipeline(args).submit[storm]("pipeline_4.conf")
-}
-
-object PipelineSpec_5 extends App {
-  Pipeline(args).submit[storm]("pipeline_5.conf")
-}
-
-object PipelineCLISpec extends App{
-  Pipeline.main(args)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
index e9be371..5e3fc9e 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
@@ -66,11 +66,6 @@
       </dependency>
       <dependency>
           <groupId>org.apache.eagle</groupId>
-          <artifactId>eagle-alert-process</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.eagle</groupId>
           <artifactId>eagle-stream-process-base</artifactId>
           <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
deleted file mode 100644
index b17c192..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import org.apache.eagle.policy.ResultRender;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.policy.executor.PolicyProcessExecutor;
-
-/**
- * @since Dec 16, 2015
- *
- */
-public class AggregateExecutor extends PolicyProcessExecutor<AggregateDefinitionAPIEntity, AggregateEntity> {
-
-	private static final long serialVersionUID = 1L;
-
-	private ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> render = new AggregateResultRender();
-
-	public AggregateExecutor(String executorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
-			PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefinitionDao, String[] sourceStreams) {
-		super(executorId, partitioner, numPartitions, partitionSeq, alertDefinitionDao, sourceStreams,
-				AggregateDefinitionAPIEntity.class);
-	}
-
-	@Override
-	public ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> getResultRender() {
-		return render;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
deleted file mode 100644
index 5093685..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.policy.executor.IPolicyExecutor;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Dec 16, 2015
- *
- */
-public class AggregateExecutorFactory {
-
-	private static final Logger LOG = LoggerFactory.getLogger(AggregateExecutorFactory.class);
-	
-	private AggregateExecutorFactory() {}
-	public static final AggregateExecutorFactory Instance = new AggregateExecutorFactory();
-
-
-	public IPolicyExecutor[] createExecutors(List<String> streamNames, String cql) throws Exception {
-		int numPartitions = 1; //loadExecutorConfig(config, executorId, partitionerCls);
-
-		IPolicyExecutor[] executors = new IPolicyExecutor[numPartitions];
-		String[] upStreams = streamNames.toArray(new String[0]);
-		for (int i = 0; i < numPartitions ; i++ ) {
-			executors[i] = new SimpleAggregateExecutor(upStreams, cql, "siddhiCEPEngine", i, numPartitions);
-		}
-
-		return executors;
-	}
-
-	public IPolicyExecutor[] createExecutors(Config config, List<String> streamNames, String executorId) throws Exception {
-		StringBuilder partitionerCls = new StringBuilder(DefaultPolicyPartitioner.class.getCanonicalName());
-        int numPartitions = loadExecutorConfig(config, executorId, partitionerCls);
-		PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao = new PolicyDefinitionEntityDAOImpl<>(
-				new EagleServiceConnector(config), Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME);
-		return newAggregateExecutors(policyDefDao, streamNames, executorId, numPartitions, partitionerCls.toString());
-	}
-	
-	@SuppressWarnings("unchecked")
-	private int loadExecutorConfig(Config config, String executorId, StringBuilder partitionerCls) {
-		int numPartitions = 0;
-		String aggregateExecutorConfigsKey = "aggregateExecutorConfigs";
-        if(config.hasPath(aggregateExecutorConfigsKey)) {
-            Map<String, ConfigValue> analyzeExecutorConfigs = config.getObject(aggregateExecutorConfigsKey);
-            if(analyzeExecutorConfigs !=null && analyzeExecutorConfigs.containsKey(executorId)) {
-                Map<String, Object> alertExecutorConfig = (Map<String, Object>) analyzeExecutorConfigs.get(executorId).unwrapped();
-                int parts = 0;
-                if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
-                numPartitions = parts == 0 ? 1 : parts;
-                if(alertExecutorConfig.containsKey("partitioner")) {
-                	partitionerCls.setLength(0);
-                	partitionerCls.append((String) alertExecutorConfig.get("partitioner"));
-                }
-            }
-        }
-        return numPartitions;
-	}
-
-//	private List<String> findStreamNames(Config config, String executorId, String dataSource) throws Exception {
-//		// Get map from alertExecutorId to alert stream
-//		// (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
-//		List<String> streamNames = new ArrayList<String>();
-//		// FIXME : here we reuse the executor definition. But the name alert is not ambiguous now.
-//		AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(new EagleServiceConnector(config));
-//		List<AlertExecutorEntity> alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource,
-//				executorId);
-//		for (AlertExecutorEntity entity : alertExecutorEntities) {
-//			streamNames.add(entity.getTags().get(Constants.STREAM_NAME));
-//		}
-//		return streamNames;
-//	}
-	
-	private AggregateExecutor[] newAggregateExecutors(PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefDAO,
-			List<String> sourceStreams, String executorID, int numPartitions, String partitionerCls)
-					throws Exception {
-		LOG.info("Creating aggregator executors with executorID: " + executorID + ", numPartitions: "
-				+ numPartitions + ", Partition class is: " + partitionerCls);
-
-		PolicyPartitioner partitioner = (PolicyPartitioner) Class.forName(partitionerCls).newInstance();
-		AggregateExecutor[] alertExecutors = new AggregateExecutor[numPartitions];
-		String[] _sourceStreams = sourceStreams.toArray(new String[sourceStreams.size()]);
-
-		for (int i = 0; i < numPartitions; i++) {
-			alertExecutors[i] = new AggregateExecutor(executorID, partitioner, numPartitions, i, alertDefDAO,
-					_sourceStreams);
-		}
-		return alertExecutors;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
deleted file mode 100644
index 986885a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.ResultRender;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Created on 12/29/15.
- */
-public class AggregateResultRender implements ResultRender<AggregateDefinitionAPIEntity, AggregateEntity>, Serializable {
-
-
-    @Override
-    public AggregateEntity render(Config config,
-                                  List<Object> rets,
-                                  PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> siddhiAlertContext,
-                                  long timestamp) {
-        AggregateEntity result = new AggregateEntity();
-        for (Object o : rets) {
-            result.add(o);
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
deleted file mode 100644
index e0dadbf..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.apache.eagle.policy.executor.IPolicyExecutor;
-import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
-import org.apache.hadoop.hbase.util.MD5Hash;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Only one policy for one simple aggregate executor
- *
- * Created on 1/10/16.
- */
-public class SimpleAggregateExecutor
-        extends JavaStormStreamExecutor2<String, AggregateEntity>
-        implements SiddhiEvaluationHandler<AggregateDefinitionAPIEntity, AggregateEntity>, IPolicyExecutor<AggregateDefinitionAPIEntity, AggregateEntity> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SimpleAggregateExecutor.class);
-
-    private final String cql;
-    private final int partitionSeq;
-    private final int totalPartitionNum;
-
-    private final String[] upStreamNames;
-    private String policyId;
-    private String executorId;
-    private Config config;
-    private AggregateDefinitionAPIEntity aggDef;
-    private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator;
-
-    public SimpleAggregateExecutor(String[] upStreams, String cql, String policyType, int partitionSeq, int totalPartitionNum) {
-        this.cql = cql;
-        this.partitionSeq = partitionSeq;
-        this.upStreamNames = upStreams;
-        this.totalPartitionNum = totalPartitionNum;
-        // create an fixed definition policy api entity, and indicate it has full definition
-        aggDef = new AggregateDefinitionAPIEntity();
-        aggDef.setTags(new HashMap<String, String>());
-        aggDef.getTags().put(Constants.POLICY_TYPE, policyType);
-        // TODO make it more general, not only hard code siddhi cep support here.
-        try {
-            Map<String,Object> template = new HashMap<>();
-            template.put("type","siddhiCEPEngine");
-            template.put("expression",this.cql);
-            template.put("containsDefinition",true);
-            aggDef.setPolicyDef(new ObjectMapper().writer().writeValueAsString(template));
-        } catch (Exception e) {
-            LOG.error("Simple aggregate generate policy definition failed!", e);
-        }
-        aggDef.setCreatedTime(new Date().getTime());
-        aggDef.setLastModifiedDate(new Date().getTime());
-        aggDef.setName("anonymous-aggregation-def");
-        aggDef.setOwner("anonymous");
-        aggDef.setEnabled(true);
-        aggDef.setDescription("anonymous aggregation definition");
-
-        String random = MD5Hash.getMD5AsHex(cql.getBytes());
-        policyId = "anonymousAggregatePolicyId-" + random;
-        executorId= "anonymousAggregateId-" +random;
-    }
-
-    @Override
-    public void prepareConfig(Config config) {
-        this.config = config;
-    }
-
-    @Override
-    public void init() {
-        evaluator = createPolicyEvaluator(aggDef);
-    }
-
-    /**
-     * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class
-     *
-     * @return PolicyEvaluator instance
-     */
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    protected PolicyEvaluator<AggregateDefinitionAPIEntity> createPolicyEvaluator(AggregateDefinitionAPIEntity alertDef) {
-        String policyType = alertDef.getTags().get(Constants.POLICY_TYPE);
-        Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
-        if (evalCls == null) {
-            String msg = "No policy evaluator defined for policy type : " + policyType;
-            LOG.error(msg);
-            throw new IllegalStateException(msg);
-        }
-
-        AbstractPolicyDefinition policyDef = null;
-        try {
-            policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class,
-                    PolicyManager.getInstance().getPolicyModules(policyType));
-        } catch (Exception ex) {
-            LOG.error("Fail initial alert policy def: " + alertDef.getPolicyDef(), ex);
-        }
-
-        PolicyEvaluator<AggregateDefinitionAPIEntity> pe;
-        PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> context = new PolicyEvaluationContext<>();
-        context.policyId = alertDef.getTags().get("policyId");
-        context.alertExecutor = this;
-        context.resultRender = new AggregateResultRender();
-        try {
-            // create evaluator instances
-            pe = (PolicyEvaluator<AggregateDefinitionAPIEntity>) evalCls
-                    .getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
-                    .newInstance(config, context, policyDef, upStreamNames, false);
-        } catch (Exception ex) {
-            LOG.error("Fail creating new policyEvaluator", ex);
-            LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
-            throw new IllegalStateException(ex);
-        }
-        return pe;
-    }
-
-    @Override
-    public void flatMap(List<Object> input, Collector<Tuple2<String, AggregateEntity>> collector) {
-        if (input.size() != 3)
-            throw new IllegalStateException("AggregateExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
-        if (LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
-        if (LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + evaluator);
-
-        try {
-            evaluator.evaluate(new ValuesArray(collector, input.get(1), input.get(2)));
-        } catch (Exception ex) {
-            LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
-        }
-    }
-
-    @Override
-    public void onEvalEvents(PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> context, List<AggregateEntity> alerts) {
-        if (alerts != null && !alerts.isEmpty()) {
-            String policyId = context.policyId;
-            LOG.info(String.format("Detected %d alerts for policy %s", alerts.size(), policyId));
-            Collector outputCollector = context.outputCollector;
-            PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator = context.evaluator;
-            for (AggregateEntity entity : alerts) {
-                synchronized (this) {
-                    outputCollector.collect(new Tuple2(policyId, entity));
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("A new alert is triggered: " + executorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity + ", for policy " + evaluator);
-                }
-            }
-        }
-    }
-
-    @Override
-    public String getExecutorId() {
-        return executorId;
-    }
-
-    @Override
-    public int getPartitionSeq() {
-        return partitionSeq;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
deleted file mode 100644
index 62830ae..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-/**
- * entity of stream analyze definition
- *
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("aggregatedef")
-@ColumnFamily("f")
-@Prefix("aggregatedef")
-@Service(Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site", "dataSource", "executorId", "policyId", "policyType"})
-@Indexes({
-	@Index(name="Index_1_aggregateExecutorId", columns = { "executorId" }, unique = true),
-})
-@SuppressWarnings("serial")
-public class AggregateDefinitionAPIEntity extends AbstractPolicyDefinitionEntity {
-
-	@Column("a")
-	private String name;
-	@Column("b")
-	private String policyDef;
-	@Column("c")
-	private String description;
-	@Column("d")
-	private boolean enabled;
-	@Column("e")
-	private String owner;
-	@Column("f")
-	private long lastModifiedDate;
-	@Column("g")
-	private long createdTime;
-
-	public String getName() {
-		return name;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-
-	public String getPolicyDef() {
-		return policyDef;
-	}
-
-	public void setPolicyDef(String policyDef) {
-		this.policyDef = policyDef;
-		valueChanged("policyDef");
-	}
-
-	public String getDescription() {
-		return description;
-	}
-
-	public void setDescription(String description) {
-		this.description = description;
-		valueChanged("description");
-	}
-
-	public boolean isEnabled() {
-		return enabled;
-	}
-
-	public void setEnabled(boolean enabled) {
-		this.enabled = enabled;
-		valueChanged("enabled");
-	}
-
-	public String getOwner() {
-		return owner;
-	}
-
-	public void setOwner(String owner) {
-		this.owner = owner;
-		valueChanged("owner");
-	}
-
-	public long getLastModifiedDate() {
-		return lastModifiedDate;
-	}
-
-	public void setLastModifiedDate(long lastModifiedDate) {
-		this.lastModifiedDate = lastModifiedDate;
-		valueChanged("lastModifiedDate");
-	}
-
-	public long getCreatedTime() {
-		return createdTime;
-	}
-
-	public void setCreatedTime(long createdTime) {
-		this.createdTime = createdTime;
-		valueChanged("createdTime");
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
deleted file mode 100644
index 64c20b2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Event entity during stream processing
- * 
- * @since Dec 17, 2015
- *
- */
-public class AggregateEntity implements Serializable {
-
-	private static final long serialVersionUID = 5911351515190098292L;
-
-    private List<Object> data = new LinkedList<>();
-
-    public void add(Object res) {
-        data.add(res);
-    }
-
-    public List<Object> getData() {
-        return data;
-    }
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
deleted file mode 100644
index 7c932d4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-/**
- * Created on 1/6/16.
- */
-public class AggregateEntityRepository extends EntityRepository {
-    public AggregateEntityRepository() {
-        entitySet.add(AggregateDefinitionAPIEntity.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
deleted file mode 100644
index 0732639..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist;
-
-/**
- * Interface by the stream framework to storage
- * 
- * @since Dec 19, 2015
- *
- */
-public interface IPersistService<T> {
-
-	boolean save(String stream, T apiEntity) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
deleted file mode 100644
index 2e1754b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.dataproc.impl.persist.druid.KafkaPersistService;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.core.StorageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.text.MessageFormat;
-import java.util.List;
-
-/**
- *
- * TODO: currently only accept to be used after aggregation node (See the AggregateEntity reference here).
- * @since Dec 19, 2015
- *
- */
-public class PersistExecutor extends JavaStormStreamExecutor2<String, AggregateEntity> {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(PersistExecutor.class);
-
-	private Config config;
-	private IPersistService<AggregateEntity> persistService;
-	private String persistExecutorId;
-	private String persistType;
-
-	public PersistExecutor(String persistExecutorId, String persistType) {
-		this.persistExecutorId = persistExecutorId;
-		this.persistType = persistType;
-	}
-
-    @Override
-	public void prepareConfig(Config config) {
-		this.config = config;
-	}
-
-    @Override
-	public void init() {
-		if (persistType.equalsIgnoreCase(StorageType.KAFKA().toString())) {
-			Config subConfig = this.config.getConfig("persistExecutorConfigs" + "." + persistExecutorId);
-			persistService = new KafkaPersistService(subConfig);
-		} else {
-			throw new RuntimeException(String.format("Persist type '%s' not supported yet!", persistService));
-		}
-	}
-
-	@Override
-	public void flatMap(List<Object> input, Collector<Tuple2<String, AggregateEntity>> collector) {
-		if (input.size() != 2) {
-			LOG.error(String.format("Persist executor expect two elements per tuple. But actually got size %d lists!",
-					input.size()));
-			return;
-		}
-
-		String policyId = (String) input.get(0);
-		AggregateEntity entity = (AggregateEntity) input.get(1);
-		try {
-			persistService.save("defaultOutput", entity);
-		} catch (Exception e) {
-			LOG.error(MessageFormat.format("persist entity failed: {0}", entity), e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
deleted file mode 100644
index ea61278..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist.druid;
-
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * TODO: configurable null handling for serialization??
- * Created on 1/4/16.
- */
-public class AggregateEntitySerializer implements
-        Closeable, AutoCloseable, Serializer<AggregateEntity> {
-
-    private final StringSerializer stringSerializer = new StringSerializer();
-    private static final Logger logger = LoggerFactory.getLogger(AggregateEntitySerializer.class);
-    private static final ObjectMapper om = new ObjectMapper();
-
-    static {
-        om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true);
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-
-    }
-
-    @Override
-    public byte[] serialize(String topic, AggregateEntity data) {
-        String str = null;
-        try {
-            str = om.writeValueAsString(data.getData());
-        } catch (IOException e) {
-            logger.error("Kafka serialization for send error!", e);
-        }
-        return stringSerializer.serialize(topic, str);
-    }
-
-    @Override
-    public void close() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
deleted file mode 100644
index 919b92e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist.druid;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.dataproc.impl.persist.IPersistService;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.util.*;
-import java.util.concurrent.Future;
-
-/**
- * TODO : support more general entity input
- * @since Dec 21, 2015
- *
- */
-public class KafkaPersistService implements IPersistService<AggregateEntity> {
-
-	private static final String ACKS = "acks";
-	private static final String RETRIES = "retries";
-	private static final String BATCH_SIZE = "batchSize";
-	private static final String LINGER_MS = "lingerMs";
-	private static final String BUFFER_MEMORY = "bufferMemory";
-	private static final String KEY_SERIALIZER = "keySerializer";
-	private static final String VALUE_SERIALIZER = "valueSerializer";
-	private static final String BOOTSTRAP_SERVERS = "bootstrap_servers";
-	
-	private KafkaProducer<String, AggregateEntity> producer;
-	private final Config config;
-	private final SortedMap<String, String> streamTopicMap;
-	private final Properties props;
-	
-	/**
-	 * <pre>
-	 * props.put("bootstrap.servers", "localhost:4242");
-	 * props.put("acks", "all");
-	 * props.put("retries", 0);
-	 * props.put("batch.size", 16384);
-	 * props.put("linger.ms", 1);
-	 * props.put("buffer.memory", 33554432);
-	 * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-	 * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-	 * </pre>
-	 */
-	public KafkaPersistService(Config config) {
-		this.config = config;
-		Config kafkaConfig = config.getConfig("kafka");
-		if (kafkaConfig == null) {
-			throw new IllegalStateException("Druid persiste service failed to find kafka configurations!");
-		}
-		props = new Properties();
-		if (kafkaConfig.hasPath(BOOTSTRAP_SERVERS)) {
-			props.put("bootstrap.servers", kafkaConfig.getString(BOOTSTRAP_SERVERS));
-		}
-		if (kafkaConfig.hasPath(ACKS)) {
-			props.put(ACKS, kafkaConfig.getString(ACKS));
-		}
-		if (kafkaConfig.hasPath(RETRIES)) {
-			props.put(RETRIES, kafkaConfig.getInt(RETRIES));
-		}
-		if (kafkaConfig.hasPath(BATCH_SIZE)) {
-			props.put("batch.size", kafkaConfig.getInt(BATCH_SIZE));
-		}
-		if (kafkaConfig.hasPath(LINGER_MS)) {
-			props.put("linger.ms", kafkaConfig.getInt(LINGER_MS));
-		}
-		if (kafkaConfig.hasPath(BUFFER_MEMORY)) {
-			props.put("buffer.memory", kafkaConfig.getLong(BUFFER_MEMORY));
-		}
-		if (kafkaConfig.hasPath(KEY_SERIALIZER)) {
-			props.put("key.serializer", kafkaConfig.getString(KEY_SERIALIZER));
-		} else {
-			props.put("key.serializer", StringSerializer.class.getCanonicalName());
-		}
-//		if (kafkaConfig.hasPath(VALUE_SERIALIZER)) {
-//			props.put("value.serializer", kafkaConfig.getString(VALUE_SERIALIZER));
-//		}
-		props.put("value.serializer", AggregateEntitySerializer.class.getCanonicalName());
-
-		streamTopicMap = new TreeMap<>();
-		if (kafkaConfig.hasPath("topics")) {
-			Config topicConfig = kafkaConfig.getConfig("topics");
-			Set<Map.Entry<String, ConfigValue>> topics = topicConfig.entrySet();
-			for (Map.Entry<String, ConfigValue> t : topics) {
-				streamTopicMap.put(t.getKey(), (String) t.getValue().unwrapped());
-			}
-		}
-
-		producer = new KafkaProducer<>(props);
-	}
-
-	@Override
-	public boolean save(String stream, AggregateEntity apiEntity) throws Exception {
-		if (streamTopicMap.get(stream) != null) {
-			ProducerRecord<String, AggregateEntity> record = new ProducerRecord<>(streamTopicMap.get(stream), apiEntity);
-			Future<RecordMetadata> future = producer.send(record);
-			// TODO : more for check the sending status
-			return true;
-		}
-		return false;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
index 2323f39..e9ba3ca 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
@@ -43,9 +43,6 @@ public class HDFSSourcedStormSpoutProvider implements StormSpoutProvider {
 			if(conf.equalsIgnoreCase("data collection")){
 				return new DataCollectionHDFSSpout(configContext); 
 			}
-			if(conf.equalsIgnoreCase("user profile generation")){
-				return new UserProfileGenerationHDFSSpout(configContext); 
-			}
 			return null;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
deleted file mode 100644
index e07ee81..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.hdfs;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.io.Serializable;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.StreamingProcessConstants;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import com.esotericsoftware.minlog.Log;
-
-public class UserProfileGenerationHDFSSpout extends HDFSSourcedStormSpoutProvider.HDFSSpout {
-
-	private static final long serialVersionUID = 2274234104008894386L;
-	private Config configContext;
-	private TopologyContext _context; 
-	SpoutOutputCollector _collector;
-	
-	public class UserProfileData implements Serializable{
-		private static final long serialVersionUID = -3315860110144736840L;
-		private String user; 
-		private List<String> dateTime = new ArrayList<String>(); 
-		private List<Integer> hrInDay = new ArrayList<Integer>(); 
-		private List<String> line = new ArrayList<String>();
-		
-		public String getUser() {
-			return user;
-		}
-		public void setUser(String user) {
-			this.user = user;
-		}
-		public String getDateTime(int index) {
-			return dateTime.get(index);
-		}
-		public List<String> getDateTimes() {
-			return dateTime;
-		}
-		public void setDateTime(String dateTime) {
-			this.dateTime.add(dateTime);
-		}
-		public int getHrInDay(int index) {
-			return hrInDay.get(index);
-		}
-		public List<Integer> getHrsInDay() {
-			return hrInDay;
-		}
-		public void setHrInDay(int hrInDay) {
-			this.hrInDay.add(hrInDay);
-		}
-		public String getLine(int index) {
-			return line.get(index);
-		}
-		public List<String> getLines() {
-			return line;
-		}
-		public void setLine(String line) {
-			this.line.add(line);
-		} 
-		
-	}
-	
-	private static final Logger LOG = LoggerFactory.getLogger(UserProfileGenerationHDFSSpout.class);
-	
-	public UserProfileGenerationHDFSSpout(Config configContext){
-		this.configContext = configContext;
-		LOG.info("UserProfileGenerationHDFSSpout called");
-	}
-	
-	public void copyFiles(){
-		LOG.info("Inside listFiles()");
-		//Configuration conf = new Configuration();
-		JobConf conf = new JobConf();
-		// _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________
-		ClassLoader cl = ClassLoader.getSystemClassLoader();
-        URL[] urls = ((URLClassLoader)cl).getURLs();
-        if(LOG.isDebugEnabled()) {
-			for (URL url : urls) {
-				LOG.debug(url.getFile());
-			}
-		}
-		// _________________________________________
-        String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnection");
-        LOG.info("HDFS connection string: " + hdfsConnectionStr);
-       
-		String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath");
-		LOG.info("HDFS path: " + hdfsPath);
-		 
-		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-		LOG.info("copyToPath: " + copyToPath);
-		String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath);
-		Path srcPath = new Path(srcPathStr); 
-		LOG.info("listFiles called");
-		LOG.info("srcPath: " + srcPath);
-		try {
-			FileSystem fs = srcPath.getFileSystem(conf);
-			/*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); 
-			CompressionCodec codec = codecFactory.getCodec(srcPath);
-			DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath)));
-			*/
-			
-			Path destPath = new Path(copyToPath);
-			LOG.info("Destination path: " + destPath);
-			String userListFileName = configContext.getString("dataSourceConfig.userList");
-			//loggerHDFSSpout.info("userListFileName: " + userListFileName);
-			List<String> userList = getUser(userListFileName);
-			for(String user:userList){
-				Path finalSrcPath = new Path(srcPath.getName() + "/" + user);
-				fs.copyToLocalFile(finalSrcPath, destPath);
-			}
-			LOG.info("Copy to local succeed");
-			fs.close();
-							
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-	}
-	
-	private List<String> getAllFiles(String root, int level){
-		
-		List<String> lists = new ArrayList<String>();
-		File rootFile = new File(root);
-		File[] tempList = rootFile.listFiles();
-		if(tempList == null)
-			return lists; 
-		
-		for(File temp:tempList){
-			if(temp.isDirectory())
-				lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
-			else{
-				if(temp.getName().endsWith(".csv"))
-					lists.add(temp.getAbsolutePath());
-			}
-		}
-		return lists;
-			
-	}
-	
-	public List<String> listFiles(String path){
-		
-		LOG.info("Reading from: " + path);
-		List<String> files = new ArrayList<String>();
-		files = getAllFiles(path, 0); 
-		return files;
-	}
-	
-	private List<String> getUser(String listFileName){
-		List<String> userList = new ArrayList<String>();
-		BufferedReader reader = null; 
-		try{
-			InputStream is = getClass().getResourceAsStream(listFileName);
-			reader = new BufferedReader(new InputStreamReader(is));
-			String line = ""; 
-			while((line = reader.readLine()) != null){
-				userList.add(line);
-				LOG.info("User added:" + line);
-			}
-		}catch(Exception e){
-			e.printStackTrace();
-		}finally{
-			try {
-				if(reader != null)
-					reader.close();
-			} catch (IOException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-			}
-		}
-		return userList;
-	}
-	
-	@Override
-	public void nextTuple() {
-		LOG.info("Releasing nextTuple");
-		
-		String userListFileName = configContext.getString("dataSourceConfig.userList");
-
-		//loggerHDFSSpout.info("userListFileName: " + userListFileName);
-		List<String> userList = getUser(userListFileName);
-		//loggerHDFSSpout.info("user list size:" + userList.size());
-		for(String user: userList){
-			LOG.info("Processing user: " + user);
-			String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-			//loggerHDFSSpout.info("copyToPath: " + copyToPath);
-			
-			copyToPath +="/" + user; 
-			List<String> files = listFiles(copyToPath);
-			LOG.info("Files returned: " + files.size());
-			String typeOfFile = configContext.getString("dataSourceConfig.fileFormat");
-			//loggerHDFSSpout.info("typeOfFile returned: " + typeOfFile);
-			UserProfileData usersProfileDataset = new UserProfileData();
-				
-			for(String fileName:files){
-				LOG.info("FileName: " + fileName);
-				usersProfileDataset.setDateTime(fileName.substring(fileName.lastIndexOf("/")+1, fileName.lastIndexOf(".")));
-				BufferedReader br = null; 
-				Reader decoder = null;
-				InputStream inStream = null;
-				
-				try{
-					inStream = new FileInputStream(new File(fileName));
-					decoder = new InputStreamReader(inStream);
-					br = new BufferedReader(decoder);
-					int lineNo = 0; 
-					String line = "";
-					while((line = br.readLine())!= null){
-						boolean containsFileHeader = configContext.getBoolean("dataSourceConfig.containsFileHeader");
-						//loggerHDFSSpout.info("containsFileHeader returned: " + containsFileHeader);
-						if(containsFileHeader == true && lineNo == 0){
-							// ignore the header column
-							lineNo++;
-							continue;
-						}
-			        	//loggerHDFSSpout.info("emitting line from file: " + fileName);
-			        	
-						usersProfileDataset.setLine(line);
-						usersProfileDataset.setHrInDay(lineNo);
-			        	lineNo++;
-					}
-				}
-				catch (Exception e) {
-					Log.error("File operation failed");
-					throw new IllegalStateException();
-				}finally{
-					try {
-						if(br != null)
-							br.close();
-						if(decoder != null)
-							decoder.close();
-						if(inStream != null)
-							inStream.close();
-					} catch (IOException e) {
-						// TODO Auto-generated catch block
-						e.printStackTrace();
-					}
-				}
-			}
-			usersProfileDataset.setUser(user);
-			_collector.emit(new ValuesArray(user, "HDFSSourcedStormExecutor", usersProfileDataset));
-        	LOG.info("Emitting data of length: " + usersProfileDataset.getLines().size());
-			Utils.sleep(1000);
-		}
-		this.close();
-	}
-	
-	@Override
-	public void open(Map arg0, TopologyContext context,
-			SpoutOutputCollector collector) {
-		 _collector = collector;
-		 _context = context;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		// TODO Auto-generated method stub
-		declarer.declare(new Fields(StreamingProcessConstants.EVENT_PARTITION_KEY, StreamingProcessConstants.EVENT_STREAM_NAME, StreamingProcessConstants.EVENT_ATTRIBUTE_MAP));
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
deleted file mode 100644
index 993a4a2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.datastream.utils.NameConstants;
-
-public class JavaMapperStormExecutor extends BaseRichBolt{
-    public static class e1 extends JavaMapperStormExecutor {
-        public e1(JavaMapper mapper){
-            super(1, mapper);
-        }
-    }
-    public static class e2 extends JavaMapperStormExecutor {
-        public e2(JavaMapper mapper){
-            super(2, mapper);
-        }
-    }
-    public static class e3 extends JavaMapperStormExecutor {
-        public e3(JavaMapper mapper){
-            super(3, mapper);
-        }
-    }
-    public static class e4 extends JavaMapperStormExecutor {
-        public e4(JavaMapper mapper){
-            super(4, mapper);
-        }
-    }
-
-    private JavaMapper mapper;
-    private OutputCollector collector;
-    private int numOutputFields;
-    public JavaMapperStormExecutor(int numOutputFields, JavaMapper mapper){
-        this.numOutputFields = numOutputFields;
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        List<Object> ret = mapper.map(input.getValues());
-        this.collector.emit(ret);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        List<String> fields = new ArrayList<String>();
-        for(int i=0; i<numOutputFields; i++){
-            fields.add(NameConstants.FIELD_PREFIX() + i);
-        }
-        declarer.declare(new Fields(fields));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
deleted file mode 100644
index a485d76..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.List;
-import java.util.SortedMap;
-
-import com.typesafe.config.Config;
-import scala.Tuple2;
-import scala.Tuple3;
-
-public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<String, String, SortedMap<Object, Object>>{
-    private JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate;
-    private String streamName;
-    public JavaStormExecutorForAlertWrapper(JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate, String streamName){
-        this.delegate = delegate;
-        this.streamName = streamName;
-    }
-    @Override
-    public void prepareConfig(Config config) {
-        delegate.prepareConfig(config);
-    }
-
-    @Override
-    public void init() {
-        delegate.init();
-    }
-
-    @Override
-    public void flatMap(List<Object> input, final Collector<Tuple3<String, String, SortedMap<Object, Object>>> collector) {
-        Collector delegateCollector = new Collector(){
-            @Override
-            public void collect(Object o) {
-                Tuple2 tuple2 = (Tuple2)o;
-                collector.collect(new Tuple3(tuple2._1, streamName, tuple2._2));
-            }
-        };
-        delegate.flatMap(input, delegateCollector);
-    }
-    
-    public JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> getDelegate() {
-    	return delegate;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
index 80e0aba..d59ded6 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties
@@ -13,28 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=info, stdout, DRFA
+log4j.rootLogger=INFO, stdout
 
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-
-#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinBolt=DEBUG
-#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinBolt=DEBUG
-log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
-#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=yyyy-MM-dd
-## 30-day backup
-# log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
index 90e59cf..c3ddb00 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
@@ -36,37 +36,6 @@ import scala.reflect.runtime.universe._
 object ExecutionEnvironments{
   type storm = StormExecutionEnvironment
 
-  /**
-   * Use `'''get[StormExecutionEnvironment](config)'''` instead
-   *
-   * @param config
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(config : Config) = new StormExecutionEnvironment(config)
-
-  /**
-   * Use `'''get[StormExecutionEnvironment]'''` instead
-   *
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm:StormExecutionEnvironment = {
-    val config = ConfigFactory.load()
-    getStorm(config)
-  }
-
-  /**
-   * Use `'''get[StormExecutionEnvironment](args)'''` instead
-   *
-   * @see get[StormExecutionEnvironment](args)
-    * @param args
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(args:Array[String]):StormExecutionEnvironment = {
-    getStorm(new ConfigOptionParser().load(args))
-  }
 
   /**
    * @param typeTag

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
deleted file mode 100644
index 46f4738..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-trait AbstractTopologyCompiler{
-  def buildTopology : AbstractTopologyExecutor
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
deleted file mode 100644
index 1e1664a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-trait AbstractTopologyExecutor {
-  def execute
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
deleted file mode 100644
index e3f3050..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.core
-
-import com.typesafe.config.{Config, _}
-
-import scala.reflect.runtime.universe._
-
-/**
- * @since  12/4/15
- */
-case class Configuration(private var config:Config) extends Serializable{
-  def get:Config = config
-
-  def set[T<:AnyRef](key:String,value:T): Unit = {
-    config = config.withValue(key,ConfigValueFactory.fromAnyRef(value))
-  }
-
-  /**
-   *
-   * @param key config key
-   * @param default default value
-   * @tparam T return type
-   * @return
-   */
-  def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = {
-    if(get.hasPath(key)) {
-      get(key)
-    } else default
-  }
-
-  def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match {
-    case STRING_TYPE => config.getString(key).asInstanceOf[T]
-    case TypeTag.Double => get.getDouble(key).asInstanceOf[T]
-    case TypeTag.Long => get.getLong(key).asInstanceOf[T]
-    case TypeTag.Int => get.getInt(key).asInstanceOf[T]
-    case TypeTag.Byte => get.getBytes(key).asInstanceOf[T]
-    case TypeTag.Boolean => get.getBoolean(key).asInstanceOf[T]
-    case NUMBER_TYPE => get.getNumber(key).asInstanceOf[T]
-    case OBJECT_TYPE => get.getObject(key).asInstanceOf[T]
-    case VALUE_TYPE => get.getValue(key).asInstanceOf[T]
-    case ANY_REF_TYPE => get.getAnyRef(key).asInstanceOf[T]
-    case INT_LIST_TYPE => get.getIntList(key).asInstanceOf[T]
-    case DOUBLE_LIST_TYPE => get.getDoubleList(key).asInstanceOf[T]
-    case BOOL_LIST_TYPE => get.getBooleanList(key).asInstanceOf[T]
-    case LONG_LIST_TYPE => get.getLongList(key).asInstanceOf[T]
-    case _ => throw new UnsupportedOperationException(s"$tag is not supported yet")
-  }
-
-  val STRING_TYPE = typeOf[String]
-  val NUMBER_TYPE = typeOf[Number]
-  val INT_LIST_TYPE = typeOf[List[Int]]
-  val BOOL_LIST_TYPE = typeOf[List[Boolean]]
-  val DOUBLE_LIST_TYPE = typeOf[List[Double]]
-  val LONG_LIST_TYPE = typeOf[List[Double]]
-  val OBJECT_TYPE = typeOf[ConfigObject]
-  val VALUE_TYPE = typeOf[ConfigValue]
-  val ANY_REF_TYPE = typeOf[AnyRef]
-}
\ No newline at end of file



Mime
View raw message