eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-477 eagle-data-process project clean up to contain only common processing eagle-data-process project clean up to contain only common processing
Date Fri, 19 Aug 2016 17:23:57 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop d6ec142d3 -> b31bac50b


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
new file mode 100644
index 0000000..7d69bbd
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm;
+
+import backtype.storm.topology.base.BaseRichSpout;
+
+import com.typesafe.config.Config;
+
+/**
+ * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation
+ * which can be retrieved from getSpout method.
+ */
+public interface StormSpoutProvider {
+	BaseRichSpout getSpout(Config context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/ValuesArray.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/ValuesArray.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/ValuesArray.java
new file mode 100644
index 0000000..de06105
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/ValuesArray.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm;
+
+import java.util.ArrayList;
+
+/**
+ * multiple datapoints are stored within one ValuesArray object
+ * sent out
+ */
+public class ValuesArray extends ArrayList<Object>{
+	private static final long serialVersionUID = -8218427810421668178L;
+
+	public ValuesArray() {
+
+    }
+
+    public ValuesArray(Object... vals) {
+        super(vals.length);
+        for(Object o: vals) {
+            add(o);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
new file mode 100644
index 0000000..7454cc2
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/**
+ * Since 6/8/16.
+ */
+public class KafkaSpoutProvider implements StormSpoutProvider {
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaSpoutProvider.class);
+    private final static String DEFAULT_CONFIG_PREFIX = "dataSourceConfig";
+    private final static String DEFAULT_CONSUMER_GROUP_ID = "eagleConsumer";
+    private final static String DEFAULT_TRANSACTION_ZK_ROOT = "/consumers";
+
+    private String configPrefix = DEFAULT_CONFIG_PREFIX;
+
+    public KafkaSpoutProvider(){}
+
+    public KafkaSpoutProvider(String prefix){
+        this.configPrefix = prefix;
+    }
+
+    @Override
+    public BaseRichSpout getSpout(Config config){
+        Config context = config;
+        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
+
+        // the following is for fetching data from one topic
+        // Kafka topic
+        String topic = context.getString("topic");
+        // Kafka broker zk connection
+        String zkConnString = context.getString("zkConnection");
+        // Kafka fetch size
+        int fetchSize = context.hasPath("fetchSize") ? context.getInt("fetchSize") : 1048586;
+        LOG.info(String.format("Use topic : %s, zkConnection : %s , fetchSize : %d", topic, zkConnString, fetchSize));
+
+        /*
+         the following is for recording offset for processing the data
+         the zk path to store current offset is comprised of the following
+         offset zkPath = zkRoot + "/" + topic + "/" + consumerGroupId + "/" + partition_Id
+
+         consumerGroupId is for differentiating different consumers which consume the same topic
+        */
+        // transaction zkRoot
+        String zkRoot = context.hasPath("transactionZKRoot") ? context.getString("transactionZKRoot") : DEFAULT_TRANSACTION_ZK_ROOT;
+        // Kafka consumer group id
+        String groupId = context.hasPath("consumerGroupId") ? context.getString("consumerGroupId") : DEFAULT_CONSUMER_GROUP_ID;
+        String brokerZkPath = context.hasPath("brokerZkPath") ? context.getString("brokerZkPath") : null;
+        BrokerHosts hosts;
+        if(brokerZkPath == null) {
+            hosts = new ZkHosts(zkConnString);
+        } else {
+            hosts = new ZkHosts(zkConnString, brokerZkPath);
+        }
+
+        SpoutConfig spoutConfig = new SpoutConfig(hosts,
+                topic,
+                zkRoot + "/" + topic,
+                groupId);
+
+        // transaction zkServers
+        String[] txZkServers = context.hasPath("txZkServers") ? context.getString("txZkServers").split(",") : new String[]{"localhost:2181"};
+        spoutConfig.zkServers = Arrays.asList(txZkServers).stream().map(server -> server.split(":")[0]).collect(Collectors.toList());
+        // transaction zkPort
+        spoutConfig.zkPort = Integer.parseInt(txZkServers[0].split(":")[1]);
+        LOG.info("txZkServers:" + spoutConfig.zkServers + ", zkPort:" + spoutConfig.zkPort);
+        // transaction update interval
+        spoutConfig.stateUpdateIntervalMs = context.hasPath("transactionStateUpdateMS") ? context.getLong("transactionStateUpdateMS") : 2000;
+        // Kafka fetch size
+        spoutConfig.fetchSizeBytes = fetchSize;
+        // "startOffsetTime" is for test usage, prod should not use this
+        if (context.hasPath("startOffsetTime")) {
+            spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
+        }
+        // "forceFromStart" is for test usage, prod should not use this
+        if (context.hasPath("forceFromStart")) {
+            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+        }
+
+        if (context.hasPath("schemeCls")) {
+            try {
+                Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance();
+                spoutConfig.scheme = new SchemeAsMultiScheme(s);
+            }catch(Exception ex){
+                LOG.error("error instantiating scheme object");
+                throw new IllegalStateException(ex);
+            }
+        }else{
+            String err = "schemeCls must be present";
+            LOG.error(err);
+            throw new IllegalStateException(err);
+        }
+        return new KafkaSpout(spoutConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
new file mode 100644
index 0000000..76ca458
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import java.io.Serializable;
+
+public interface SpoutKafkaMessageDeserializer extends Serializable{
+	public Object deserialize(byte[] arg0);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
new file mode 100644
index 0000000..09193de
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomPartitionGrouping implements CustomStreamGrouping {
+
+    public List<Integer> targetTasks;
+    public PartitionStrategy strategy;
+
+    public CustomPartitionGrouping(PartitionStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        this.targetTasks = new ArrayList<>(targetTasks);
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        int numTasks = targetTasks.size();
+        int targetTaskIndex = strategy.balance((String)values.get(0), numTasks);
+        return Arrays.asList(targetTasks.get(targetTaskIndex));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/DataDistributionDao.java
new file mode 100644
index 0000000..d54fab9
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/DataDistributionDao.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface DataDistributionDao extends Serializable {
+
+    List<Weight> fetchDataDistribution(long startTime, long endTime) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionAlgorithm.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionAlgorithm.java
new file mode 100644
index 0000000..ca08bb1
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionAlgorithm.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public interface PartitionAlgorithm extends Serializable {
+    Map<String, Integer> partition(List<Weight> weights, int k);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategy.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategy.java
new file mode 100644
index 0000000..ae05a6c
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import java.io.Serializable;
+
+public interface PartitionStrategy extends Serializable {
+
+    int balance(String key, int buckNum);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategyImpl.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategyImpl.java
new file mode 100644
index 0000000..e36e75c
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategyImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import org.apache.commons.lang3.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class PartitionStrategyImpl implements PartitionStrategy {
+
+    public DataDistributionDao dao;
+    public PartitionAlgorithm algorithm;
+    public Map<String, Integer> routingTable;
+    public long lastRefreshTime;
+    public long refreshInterval;
+    public long timeRange;
+    public static long DEFAULT_TIME_RANGE = 2 * DateUtils.MILLIS_PER_DAY;
+    public static long DEFAULT_REFRESH_INTERVAL = 2 * DateUtils.MILLIS_PER_HOUR;
+    private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class);
+
+    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval, long timeRange) {
+        this.dao = dao;
+        this.algorithm = algorithm;
+        this.refreshInterval = refreshInterval;
+        this.timeRange = timeRange;
+    }
+
+    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) {
+        this(dao, algorithm, DEFAULT_REFRESH_INTERVAL, DEFAULT_TIME_RANGE);
+    }
+
+    public boolean needRefresh() {
+        if (System.currentTimeMillis() > lastRefreshTime + refreshInterval) {
+            lastRefreshTime = System.currentTimeMillis();
+            return true;
+        }
+        return false;
+    }
+
+    public Map<String, Integer> generateRoutingTable(int buckNum) {
+        try {
+            long currentTime = System.currentTimeMillis();
+            List<Weight> weights = dao.fetchDataDistribution(currentTime - timeRange, currentTime);
+            routingTable = algorithm.partition(weights, buckNum);
+            return routingTable;
+        }
+        catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public int balance(String key, int buckNum) {
+        if (needRefresh()) {
+            LOG.info("Going to refresh routing table");
+            routingTable = generateRoutingTable(buckNum);
+            LOG.info("Finish refresh routing table");
+        }
+        if (routingTable.containsKey(key)) {
+            return routingTable.get(key);
+        }
+        else {
+            return Math.abs(key.hashCode()) % buckNum;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/Weight.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/Weight.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/Weight.java
new file mode 100644
index 0000000..314812a
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/Weight.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+public class Weight {
+    public String key;
+    public Double value;
+
+    public Weight(String key, Double value) {
+        this.key = key;
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
new file mode 100644
index 0000000..f9515f5
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.zookeeper;
+
+import java.io.Serializable;
+
+public class ZKStateConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public String zkQuorum;
+    public String zkRoot;
+    public int zkSessionTimeoutMs;
+    public int zkRetryTimes;
+    public int zkRetryInterval;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/resources/application.conf b/eagle-core/eagle-data-process/src/main/resources/application.conf
new file mode 100644
index 0000000..c386a71
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/resources/application.conf
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "local",
+    "topologyName" : "SpadesMonitorTopology",
+    "stormConfigFile" : "spades-monitor-storm.yaml",
+    "parallelismConfig" : {
+      "SpadesMonitorStream" : 1,
+      "SpadesMonitorExecutor*" : 1
+    }
+  },
+  "dataSourceConfig": {
+    "topic" : "spades_monitor_sandbox",
+    "zkConnection" : "sandbox.hortonworks.com:2181",
+    "zkConnectionTimeoutMS" : 15000,
+    "consumerGroupId" : "eagle.consumer",
+    "fetchSize" : 1048586,
+    "transactionZKServers" : "sandbox.hortonworks.com",
+    "transactionZKPort" : 2181,
+    "transactionZKRoot" : "/consumers",
+    "transactionStateUpdateMS" : 2000
+  },
+  "alertExecutorConfigs" : {
+    "SpadesMonitorExecutor" : {
+      "parallelism" : 1,
+      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+      "needValidation" : "true"
+    }
+  },
+  "eagleProps" : {
+    "site" : "sandbox",
+    "application": "SpadesMonitor",
+    "dataJoinPollIntervalSec" : 30,
+    "mailHost" : "mailHost.com",
+    "mailSmtpPort":"25",
+    "mailDebug" : "true",
+    "balancePartitionEnabled" : true,
+    #"partitionRefreshIntervalInMin" : 60,
+    #"kafkaStatisticRangeInMin" : 60,
+    "eagleService": {
+      "host": "localhost",
+      "port": 9099,
+      "username": "admin",
+      "password": "secret"
+    }
+    "readHdfsUserCommandPatternFrom" : "file"
+  },
+  "dynamicConfigSource" : {
+    "enabled" : true,
+    "initDelayMillis" : 0,
+    "delayMillis" : 30000
+  },
+  "eagleNotificationProps" : {
+    "eagleStoreEnabled": true,
+    "kafka_broker":"127.0.0.1:6667"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/resources/log4j.properties b/eagle-core/eagle-data-process/src/main/resources/log4j.properties
new file mode 100644
index 0000000..d59ded6
--- /dev/null
+++ b/eagle-core/eagle-data-process/src/main/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-policy/eagle-policy-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/pom.xml b/eagle-core/eagle-policy/eagle-policy-base/pom.xml
index a6f1b02..13bb0d9 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/pom.xml
+++ b/eagle-core/eagle-policy/eagle-policy-base/pom.xml
@@ -48,7 +48,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-base</artifactId>
+            <artifactId>eagle-data-process</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
deleted file mode 100644
index 2a33460..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.common;
-
-public class Constants {
-	public final static String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
-	public final static String ALERT_DEFINITION_SERVICE_ENDPOINT_NAME = "AlertDefinitionService";
-	public final static String ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME = "AlertStreamSchemaService";
-	public final static String ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME = "AlertDataSourceService";
-	public final static String ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME = "AlertExecutorService";
-	public final static String ALERT_STREAM_SERVICE_ENDPOINT_NAME = "AlertStreamService";
-	public final static String ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME = "AlertNotificationService";
-	public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin";
-	public static final String ALERT_TIMESTAMP_PROPERTY = "alertTimestamp";
-
-	public static final String SITE_APPLICATION_SERVICE_ENDPOINT_NAME = "SiteApplicationService";
-	public static final String SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME = "SiteDescService";
-	public static final String APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME = "ApplicationDescService";
-	public static final String FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME = "FeatureDescService";
-
-	public static final String TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME = "TopologyExecutionService";
-	public static final String TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME = "TopologyOperationService";
-	public static final String TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME = "TopologyDescriptionService";
-
-	public static final String GENERIC_RESOURCE_SERVICE_ENDPOINT_NAME = "GenericResourceService";
-	
-	public final static String AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME = "AggregateDefinitionService";
-
-	public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp";
-	public static final String ALERT_EMAIL_COUNT_PROPERTY = "count";
-	public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList";
-
-	public static final String URL = "url";
-	public static final String ALERT_SOURCE = "alertSource";
-	public static final String ALERT_MESSAGE = "alertMessage";
-	public static final String SUBJECT = "subject";
-	public static final String ALERT_EXECUTOR_ID = "alertExecutorId";
-	public static final String POLICY_NAME = "policyName";
-	public static final String POLICY_ID = "policyId";
-    public static final String SOURCE_STREAMS = "sourceStreams";
-    public static final String ALERT_EVENT = "alertEvent";
-	public static final String POLICY_DETAIL_URL = "policyDetailUrl";
-	public static final String ALERT_DETAIL_URL = "alertDetailUrl";
-
-	public static final String POLICY_DEFINITION = "policyDefinition";
-	public static final String POLICY_TYPE = "policyType";
-	public static final String STREAM_NAME = "streamName";
-	public static final String ATTR_NAME = "attrName";
-
-	public static final String ALERT_EXECUTOR_CONFIGS = "alertExecutorConfigs";
-	public static final String PARALLELISM = "parallelism";
-	public static final String PARTITIONER = "partitioner";
-	public static final String SOURCE = "source";
-	public static final String PARTITIONSEQ = "partitionSeq";
-	public static final String EXECUTOR_ID = "executorId";
-
-	public enum policyType {
-		siddhiCEPEngine,
-		MachineLearning;
-
-		policyType() {
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java
deleted file mode 100644
index a5d506f..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-/**
- * @see org.wso2.siddhi.query.api.definition.Attribute.Type
- */
-public enum AttributeType {
-	STRING,
-	LONG,
-	INTEGER,
-	BOOL,
-    FLOAT,
-    DOUBLE
-//    , OBJECT
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
deleted file mode 100644
index cce3aca..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-gc/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-gc/pom.xml b/eagle-gc/pom.xml
index 653cdff..f8615ef 100644
--- a/eagle-gc/pom.xml
+++ b/eagle-gc/pom.xml
@@ -44,7 +44,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.eagle</groupId>
-      <artifactId>eagle-stream-process-api</artifactId>
+      <artifactId>eagle-data-process</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
index acb442e..ebbeee1 100644
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
@@ -29,13 +29,11 @@ import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.common.config.EagleConfigHelper;
-import org.apache.eagle.datastream.*;
 import org.apache.eagle.gc.common.GCConstants;
 import org.apache.eagle.gc.model.GCPausedEvent;
 import org.apache.eagle.metric.reportor.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
 import java.util.*;
 
@@ -122,4 +120,4 @@ public class GCMetricGeneratorBolt extends BaseRichBolt {
             collector.ack(input);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/pom.xml b/eagle-jpm/eagle-hadoop-queue/pom.xml
index 44269b0..f9cfed5 100644
--- a/eagle-jpm/eagle-hadoop-queue/pom.xml
+++ b/eagle-jpm/eagle-hadoop-queue/pom.xml
@@ -44,7 +44,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-api</artifactId>
+            <artifactId>eagle-data-process</artifactId>
             <version>${project.version}</version>
             <exclusions>
                 <exclusion>
@@ -88,4 +88,4 @@
     </build>
 
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
index a285a6b..caf99ad 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
@@ -23,12 +23,10 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
 import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.common.config.ConfigOptionParser;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
index b8a5b45..693fdd1 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
@@ -24,7 +24,7 @@ package org.apache.eagle.hadoop.queue.crawler;
 import backtype.storm.spout.SpoutOutputCollector;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.hadoop.queue.model.clusterMetrics.ClusterMetrics;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
index be06e12..ab56508 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
@@ -23,7 +23,7 @@ package org.apache.eagle.hadoop.queue.crawler;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.hadoop.queue.model.applications.App;
 import org.apache.eagle.hadoop.queue.model.applications.Apps;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
index 99d83d4..82f433e 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -19,7 +19,7 @@
 package org.apache.eagle.hadoop.queue.crawler;
 
 import backtype.storm.spout.SpoutOutputCollector;
-import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
 import org.apache.eagle.hadoop.queue.model.scheduler.*;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index 5568284..bfd4cf2 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -33,6 +33,14 @@
     <url>http://maven.apache.org</url>
     <dependencies>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
@@ -48,22 +56,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-api</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.wso2.orbit.com.lmax</groupId>
-                    <artifactId>disruptor</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>asm</groupId>
-                    <artifactId>asm</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-base</artifactId>
+            <artifactId>eagle-data-process</artifactId>
             <version>${project.version}</version>
             <exclusions>
                 <exclusion>
@@ -140,4 +133,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
index 447a59a..c99891b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
@@ -19,7 +19,7 @@
 package org.apache.eagle.jpm.mr.history.common;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.common.config.ConfigOptionParser;
 import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
 import org.apache.eagle.jpm.util.JobIdPartitioner;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index ff0c8c8..6f85149 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.crawler;
 
-import org.apache.eagle.dataproc.core.EagleOutputCollector;
 import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
 import org.apache.eagle.jpm.mr.history.parser.JHFParserBase;
 import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory;
@@ -34,13 +33,11 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
 
 
     private JobHistoryContentFilter m_filter;
-    private EagleOutputCollector m_eagleCollector;
     private JHFConfigManager m_configManager;
 
     public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, JHFConfigManager configManager, EagleOutputCollector eagleCollector) {
         this.m_filter = filter;
         this.m_configManager = configManager;
-        this.m_eagleCollector = eagleCollector;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
new file mode 100644
index 0000000..693e876
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
+
+import java.io.Serializable;
+
+/**
+ * expose simple interface for streaming executor to populate output data
+ *
+ */
+public interface EagleOutputCollector extends Serializable{
+	void collect(ValuesArray t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
index e055957..211dd9d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,8 +19,7 @@
 package org.apache.eagle.jpm.mr.history.crawler;
 
 import backtype.storm.spout.SpoutOutputCollector;
-import org.apache.eagle.dataproc.core.EagleOutputCollector;
-import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 
 public class JobHistorySpoutCollectorInterceptor implements EagleOutputCollector {
     private SpoutOutputCollector m_collector;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index 27de19c..d97e2a3 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
@@ -124,7 +123,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
         ObjectMapper objectMapper = new ObjectMapper();
         try {
             LOG.warn("Unknown task type of task attempt execution entity: "+objectMapper.writeValueAsString(entity));
-        } catch (JsonProcessingException e) {
+        } catch (Exception e) {
             LOG.error(e.getMessage(),e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
index e409630..414a221 100644
--- a/eagle-jpm/eagle-jpm-mr-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -38,22 +38,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-api</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.wso2.orbit.com.lmax</groupId>
-                    <artifactId>disruptor</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>asm</groupId>
-                    <artifactId>asm</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-base</artifactId>
+            <artifactId>eagle-data-process</artifactId>
             <version>${project.version}</version>
             <exclusions>
                 <exclusion>
@@ -123,4 +108,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
index 05e7812..a91a493 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
@@ -19,9 +19,9 @@
 package org.apache.eagle.jpm.mr.running.config;
 
 import com.typesafe.config.Config;
+import org.apache.eagle.common.config.ConfigOptionParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml
index 34d8545..64dd1d8 100644
--- a/eagle-jpm/eagle-jpm-spark-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -35,7 +35,7 @@
       </dependency>
       <dependency>
           <groupId>org.apache.eagle</groupId>
-          <artifactId>eagle-stream-process-api</artifactId>
+          <artifactId>eagle-data-process</artifactId>
           <version>${project.version}</version>
           <exclusions>
               <exclusion>
@@ -54,21 +54,6 @@
           <version>${curator.version}</version>
       </dependency>
       <dependency>
-          <groupId>org.apache.eagle</groupId>
-          <artifactId>eagle-stream-process-base</artifactId>
-          <version>${project.version}</version>
-          <exclusions>
-              <exclusion>
-                  <groupId>org.wso2.orbit.com.lmax</groupId>
-                  <artifactId>disruptor</artifactId>
-              </exclusion>
-              <exclusion>
-                  <groupId>asm</groupId>
-                  <artifactId>asm</artifactId>
-              </exclusion>
-          </exclusions>
-      </dependency>
-      <dependency>
         <groupId>org.jsoup</groupId>
         <artifactId>jsoup</artifactId>
       </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index 668bc02..5988273 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -19,7 +19,7 @@
 package org.apache.eagle.jpm.spark.running;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.common.config.ConfigOptionParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml
index da7587d..32392fe 100644
--- a/eagle-security/eagle-metric-collection/pom.xml
+++ b/eagle-security/eagle-metric-collection/pom.xml
@@ -77,12 +77,7 @@
       </dependency>
       <dependency>
           <groupId>org.apache.eagle</groupId>
-          <artifactId>eagle-stream-process-base</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.eagle</groupId>
-          <artifactId>eagle-stream-process-api</artifactId>
+          <artifactId>eagle-data-process</artifactId>
           <version>${project.version}</version>
       </dependency>
       <dependency>
@@ -92,4 +87,4 @@
           <version>0.13.0</version>
       </dependency>
   </dependencies>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
index 4ef50ea..646f2d4 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
@@ -43,15 +43,7 @@ public class EagleMetricCollectorApplication extends StormApplication{
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         String deserClsName = config.getString("dataSourceConfig.deserializerClass");
-        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
-            @Override
-            public List<Object> deserialize(byte[] ser) {
-                Object tmp = deserializer.deserialize(ser);
-                Map<String, Object> map = (Map<String, Object>)tmp;
-                if(tmp == null) return null;
-                return Arrays.asList(map.get("user"), map.get("timestamp"));
-            }
-        };
+        KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config);
 
         TopologyBuilder builder = new TopologyBuilder();
         BaseRichSpout spout1 = new KafkaOffsetSourceSpoutProvider().getSpout(config);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
index 14b2384..e9bc381 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
@@ -20,11 +20,11 @@ import backtype.storm.spout.Scheme;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
 import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
-import org.apache.eagle.datastream.utils.NameConstants;
 
 import java.lang.reflect.Constructor;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -35,7 +35,7 @@ import java.util.Properties;
  */
 public class KafkaSourcedSpoutScheme implements Scheme {
 	protected SpoutKafkaMessageDeserializer deserializer;
-	
+
 	public KafkaSourcedSpoutScheme(String deserClsName, Config context){
 		try{
 			Properties prop = new Properties();
@@ -48,25 +48,21 @@ public class KafkaSourcedSpoutScheme implements Scheme {
 			throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex);
 		}
 	}
-	
+
 	@Override
 	public List<Object> deserialize(byte[] ser) {
 		Object tmp = deserializer.deserialize(ser);
-		if(tmp == null)
-			return null;
-		// the following tasks are executed within the same process of kafka spout
-		return Arrays.asList(tmp);
+		Map<String, Object> map = (Map<String, Object>)tmp;
+		if(tmp == null) return null;
+		return Arrays.asList(map.get("user"), map.get("timestamp"));
 	}
 
     /**
      * Default only f0, but it requires to be overrode if different
-     *
-     * TODO: Handle the schema with KeyValue based structure
-     *
      * @return Fields
      */
 	@Override
 	public Fields getOutputFields() {
-        return new Fields(NameConstants.FIELD_PREFIX()+"0");
+        return new Fields(NameConstants.FIELD_PREFIX+"0");
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/NameConstants.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/NameConstants.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/NameConstants.java
new file mode 100644
index 0000000..cdf6b38
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/NameConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metric.kafka;
+
+/**
+ * Since 8/18/16.
+ */
+public class NameConstants {
+    public static final String FIELD_PREFIX = "f";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java b/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java
index 28c2764..21aad05 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java
@@ -20,7 +20,7 @@ package org.apache.eagle.metric.kafka;
 
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.dataproc.impl.storm.partition.DataDistributionDao;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
 import org.junit.Ignore;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java b/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java
index 0f7a720..eb4715a 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java
@@ -20,8 +20,8 @@ package org.apache.eagle.metric.kafka;
 
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.partition.DataDistributionDao;
-import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.dataproc.impl.storm.partition.DataDistributionDao;
+import org.apache.eagle.dataproc.impl.storm.partition.PartitionAlgorithm;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
 import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
 import org.junit.Ignore;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/pom.xml b/eagle-security/eagle-security-common/pom.xml
index bff3897..186c909 100644
--- a/eagle-security/eagle-security-common/pom.xml
+++ b/eagle-security/eagle-security-common/pom.xml
@@ -33,7 +33,7 @@
 	</dependency>
   	<dependency>
   		<groupId>org.apache.eagle</groupId>
-  		<artifactId>eagle-stream-process-api</artifactId>
+  		<artifactId>eagle-data-process</artifactId>
         <version>${project.version}</version>
   	</dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
index 24deb65..23282d2 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -20,10 +20,10 @@
 package org.apache.eagle.security.partition;
 
 import com.sun.jersey.api.client.WebResource;
+import org.apache.eagle.dataproc.impl.storm.partition.DataDistributionDao;
+import org.apache.eagle.dataproc.impl.storm.partition.Weight;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.metric.MetricConstants;
-import org.apache.eagle.partition.DataDistributionDao;
-import org.apache.eagle.partition.Weight;
 import org.apache.eagle.service.client.EagleServiceClientException;
 import org.apache.eagle.service.client.EagleServiceSingleEntityQueryRequest;
 import org.apache.eagle.service.client.IEagleServiceClient;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
index 0197393..fa32d55 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
@@ -19,8 +19,8 @@
 
 package org.apache.eagle.security.partition;
 
-import org.apache.eagle.partition.PartitionAlgorithm;
-import org.apache.eagle.partition.Weight;
+import org.apache.eagle.dataproc.impl.storm.partition.PartitionAlgorithm;
+import org.apache.eagle.dataproc.impl.storm.partition.Weight;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index 3d452b4..bd23643 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -31,11 +31,7 @@ import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.sink.StormStreamSink;
 import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.impl.storm.partition.CustomPartitionGrouping;
-import org.apache.eagle.partition.DataDistributionDao;
-import org.apache.eagle.partition.PartitionAlgorithm;
-import org.apache.eagle.partition.PartitionStrategy;
-import org.apache.eagle.partition.PartitionStrategyImpl;
+import org.apache.eagle.dataproc.impl.storm.partition.*;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
 import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java
new file mode 100644
index 0000000..0585663
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.auditlog;
+
+/**
+ * @see org.wso2.siddhi.query.api.definition.Attribute.Type
+ */
+public enum AttributeType {
+	STRING,
+	LONG,
+	INTEGER,
+	BOOL,
+    FLOAT,
+    DOUBLE
+//    , OBJECT
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
index 2ce0dc1..e06143c 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
@@ -20,7 +20,6 @@ package org.apache.eagle.security.auditlog;
 
 import backtype.storm.task.OutputCollector;
 import com.typesafe.config.Config;
-import org.apache.eagle.policy.siddhi.AttributeType;
 import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.slf4j.Logger;
@@ -189,4 +188,4 @@ public class HdfsUserCommandReassembler {
         }
         return siddhiEvent;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
index 12d0732..812540e 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
@@ -22,7 +22,7 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
-import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.jpm.util.*;
 import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
 import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;


Mime
View raw message