eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [05/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:13 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
deleted file mode 100644
index 8038d42..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.spout.Scheme;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, KafkaStreamSinkConfig,KafkaStreamSource,KafkaStreamSourceConfig> {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamProvider.class);
-    private static final String DEFAULT_SHARED_SINK_TOPIC_CONF_KEY = "dataSinkConfig.topic";
-    private static final String DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY = "dataSourceConfig.topic";
-    private static final String DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY = "dataSourceConfig.schemeCls";
-
-    private String getSinkTopicName(String streamId, Config config) {
-        String streamSpecificTopicConfigKey = String.format("dataSinkConfig.%s.topic",streamId);
-        if (config.hasPath(streamSpecificTopicConfigKey)) {
-            return config.getString(streamSpecificTopicConfigKey);
-        } else if (config.hasPath(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY)) {
-            LOG.warn("Using default shared sink topic {}: {}", DEFAULT_SHARED_SINK_TOPIC_CONF_KEY, config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY));
-            return config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
-        } else {
-            LOG.error("Neither stream specific topic: {} nor default shared topic: {} found in config", streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
-            throw new IllegalArgumentException("Neither stream specific topic: "
-                + streamSpecificTopicConfigKey + " nor default shared topic: " + DEFAULT_SHARED_SINK_TOPIC_CONF_KEY + " found in config");
-        }
-    }
-
-    private String getSourceTopicName(String streamId, Config config) {
-        String streamSpecificTopicConfigKey = String.format("dataSourceConfig.%s.topic",streamId);
-        if (config.hasPath(streamSpecificTopicConfigKey)) {
-            return config.getString(streamSpecificTopicConfigKey);
-        } else if (config.hasPath(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)) {
-            LOG.warn("Using default shared source topic {}: {}", DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY, config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY));
-            return config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY);
-        } else {
-            LOG.debug("Neither stream specific topic: {} nor default shared topic: {} found in config, try sink config instead", streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
-            return getSinkTopicName(streamId,config);
-        }
-    }
-
-    private String getSourceSchemeCls(String streamId, Config config) {
-        String streamSpecificSchemeClsKey = String.format("dataSourceConfig.%s.schemeCls", streamId);
-        if (config.hasPath(streamSpecificSchemeClsKey) ) {
-            return config.getString(streamSpecificSchemeClsKey);
-        } else if (config.hasPath(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY)) {
-            LOG.warn("Using default shared source topic {}: {}", DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY, config.getString(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY));
-            return config.getString(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY);
-        }
-        return null;
-    }
-
-    @Override
-    public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) {
-        KafkaStreamSinkConfig sinkConfig = new KafkaStreamSinkConfig();
-        sinkConfig.setTopicId(getSinkTopicName(streamId,config));
-        sinkConfig.setBrokerList(config.getString("dataSinkConfig.brokerList"));
-        sinkConfig.setSerializerClass(hasNonBlankConfigPath(config, "dataSinkConfig.serializerClass")
-            ? config.getString("dataSinkConfig.serializerClass") : "kafka.serializer.StringEncoder");
-        sinkConfig.setKeySerializerClass(hasNonBlankConfigPath(config, "dataSinkConfig.keySerializerClass")
-            ? config.getString("dataSinkConfig.keySerializerClass") : "kafka.serializer.StringEncoder");
-
-        // new added properties for async producer
-        sinkConfig.setNumBatchMessages(hasNonBlankConfigPath(config, "dataSinkConfig.numBatchMessages")
-            ? config.getString("dataSinkConfig.numBatchMessages") : "1024");
-        sinkConfig.setProducerType(hasNonBlankConfigPath(config, "dataSinkConfig.producerType")
-            ? config.getString("dataSinkConfig.producerType") : "async");
-        sinkConfig.setMaxQueueBufferMs(hasNonBlankConfigPath(config, "dataSinkConfig.maxQueueBufferMs")
-            ? config.getString("dataSinkConfig.maxQueueBufferMs") : "3000");
-        sinkConfig.setRequestRequiredAcks(hasNonBlankConfigPath(config, "dataSinkConfig.requestRequiredAcks")
-            ? config.getString("dataSinkConfig.requestRequiredAcks") : "1");
-
-        return sinkConfig;
-    }
-
-    @Override
-    public KafkaStreamSink getSink() {
-        return new KafkaStreamSink();
-    }
-
-    private boolean hasNonBlankConfigPath(Config config, String configName) {
-        return config.hasPath(configName) && StringUtils.isNotBlank(config.getString(configName));
-    }
-
-    @Override
-    public KafkaStreamSourceConfig getSourceConfig(String streamId, Config config) {
-        KafkaStreamSourceConfig sourceConfig = new KafkaStreamSourceConfig();
-
-        sourceConfig.setTopicId(getSourceTopicName(streamId,config));
-        sourceConfig.setBrokerZkQuorum(config.getString("dataSourceConfig.zkConnection"));
-
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.fetchSize")) {
-            sourceConfig.setFetchSize(config.getInt("dataSourceConfig.fetchSize"));
-        }
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.transactionZKRoot")) {
-            sourceConfig.setTransactionZKRoot(config.getString("dataSourceConfig.transactionZKRoot"));
-        }
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.consumerGroupId")) {
-            sourceConfig.setConsumerGroupId(config.getString("dataSourceConfig.consumerGroupId"));
-        }
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.brokerZkPath")) {
-            sourceConfig.setBrokerZkPath(config.getString("dataSourceConfig.brokerZkPath"));
-        }
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.txZkServers")) {
-            sourceConfig.setTransactionZkServers(config.getString("dataSourceConfig.txZkServers"));
-        }
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.transactionStateUpdateMS")) {
-            sourceConfig.setTransactionStateUpdateMS(config.getLong("dataSourceConfig.transactionStateUpdateMS"));
-        }
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.startOffsetTime")) {
-            sourceConfig.setStartOffsetTime(config.getInt("dataSourceConfig.startOffsetTime"));
-        }
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.forceFromStart")) {
-            sourceConfig.setForceFromStart(config.getBoolean("dataSourceConfig.forceFromStart"));
-        }
-        String schemeCls = getSourceSchemeCls(streamId, config);
-        if (schemeCls != null && StringUtils.isNotBlank(schemeCls)) {
-            try {
-                sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(schemeCls));
-            } catch (ClassNotFoundException e) {
-                LOG.error("Class not found error, dataSourceConfig.schemeCls = {}", schemeCls, e);
-            }
-        }
-        return sourceConfig;
-    }
-
-    @Override
-    public KafkaStreamSource getSource() {
-        return new KafkaStreamSource();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
deleted file mode 100644
index 696d79f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class);
-    private String topicId;
-    private Producer producer;
-    private KafkaStreamSinkConfig config;
-
-    @Override
-    public void init(String streamId, KafkaStreamSinkConfig config) {
-        super.init(streamId, config);
-        this.topicId = config.getTopicId();
-        this.config = config;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        super.prepare(stormConf, context, collector);
-        Properties properties = new Properties();
-        properties.put("metadata.broker.list", config.getBrokerList());
-        properties.put("serializer.class", config.getSerializerClass());
-        properties.put("key.serializer.class", config.getKeySerializerClass());
-        // new added properties for async producer
-        properties.put("producer.type", config.getProducerType());
-        properties.put("batch.num.messages", config.getNumBatchMessages());
-        properties.put("request.required.acks", config.getRequestRequiredAcks());
-        properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
-        ProducerConfig producerConfig = new ProducerConfig(properties);
-        producer = new Producer(producerConfig);
-    }
-
-    @Override
-    protected void execute(Object key, Map event, OutputCollector collector) throws Exception {
-        try {
-            String output = new ObjectMapper().writeValueAsString(event);
-            // partition key may cause data skew
-            //producer.send(new KeyedMessage(this.topicId, key, output));
-            producer.send(new KeyedMessage(this.topicId, output));
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-            throw ex;
-        }
-    }
-
-    @Override
-    public void afterInstall() {
-        ensureTopicCreated();
-    }
-
-    private void ensureTopicCreated() {
-        LOG.info("TODO: ensure kafka topic {} created", this.topicId);
-    }
-
-    private void ensureTopicDeleted() {
-        LOG.info("TODO: ensure kafka topic {} deleted", this.topicId);
-    }
-
-    @Override
-    public void cleanup() {
-        if (this.producer != null) {
-            this.producer.close();
-        }
-    }
-
-    @Override
-    public void afterUninstall() {
-        ensureTopicDeleted();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
deleted file mode 100644
index bdc4f53..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-import java.util.Objects;
-
-/**
- * FIXME Rename to KafkaStreamMessagingConfig.
- */
-public class KafkaStreamSinkConfig implements StreamSinkConfig {
-    // Write Config
-    private String topicId;
-    private String brokerList;
-    private String serializerClass;
-    private String keySerializerClass;
-    private String numBatchMessages;
-    private String maxQueueBufferMs;
-    private String producerType;
-    private String requestRequiredAcks;
-
-    public String getTopicId() {
-        return topicId;
-    }
-
-    public void setTopicId(String topicId) {
-        this.topicId = topicId;
-    }
-
-    public String getBrokerList() {
-        return brokerList;
-    }
-
-    public void setBrokerList(String brokerList) {
-        this.brokerList = brokerList;
-    }
-
-    public String getSerializerClass() {
-        return serializerClass;
-    }
-
-    public void setSerializerClass(String serializerClass) {
-        this.serializerClass = serializerClass;
-    }
-
-    public String getKeySerializerClass() {
-        return keySerializerClass;
-    }
-
-    public void setKeySerializerClass(String keySerializerClass) {
-        this.keySerializerClass = keySerializerClass;
-    }
-
-    public String getNumBatchMessages() {
-        return numBatchMessages;
-    }
-
-    public void setNumBatchMessages(String numBatchMessages) {
-        this.numBatchMessages = numBatchMessages;
-    }
-
-    public String getMaxQueueBufferMs() {
-        return maxQueueBufferMs;
-    }
-
-    public void setMaxQueueBufferMs(String maxQueueBufferMs) {
-        this.maxQueueBufferMs = maxQueueBufferMs;
-    }
-
-    public String getProducerType() {
-        return producerType;
-    }
-
-    public void setProducerType(String producerType) {
-        this.producerType = producerType;
-    }
-
-    public String getRequestRequiredAcks() {
-        return requestRequiredAcks;
-    }
-
-    public void setRequestRequiredAcks(String requestRequiredAcks) {
-        this.requestRequiredAcks = requestRequiredAcks;
-    }
-
-    @Override
-    public String getType() {
-        return "KAFKA";
-    }
-
-    @Override
-    public Class<?> getSinkType() {
-        return KafkaStreamSink.class;
-    }
-
-    @Override
-    public Class<? extends StreamSinkConfig> getConfigType() {
-        return KafkaStreamSinkConfig.class;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof KafkaStreamSinkConfig)) {
-            return false;
-        }
-
-        KafkaStreamSinkConfig config = (KafkaStreamSinkConfig) o;
-
-        if (!getTopicId().equals(config.getTopicId())) {
-            return false;
-        }
-        if (getBrokerList() != null ? !getBrokerList().equals(config.getBrokerList()) : config.getBrokerList() != null) {
-            return false;
-        }
-        if (getSerializerClass() != null ? !getSerializerClass().equals(config.getSerializerClass()) : config.getSerializerClass() != null) {
-            return false;
-        }
-        if (getKeySerializerClass() != null ? !getKeySerializerClass().equals(config.getKeySerializerClass()) : config.getKeySerializerClass() != null) {
-            return false;
-        }
-        if (getNumBatchMessages() != null ? !getNumBatchMessages().equals(config.getNumBatchMessages()) : config.getNumBatchMessages() != null) {
-            return false;
-        }
-        if (getMaxQueueBufferMs() != null ? !getMaxQueueBufferMs().equals(config.getMaxQueueBufferMs()) : config.getMaxQueueBufferMs() != null) {
-            return false;
-        }
-        if (getProducerType() != null ? !getProducerType().equals(config.getProducerType()) : config.getProducerType() != null) {
-            return false;
-        }
-        return getRequestRequiredAcks() != null ? getRequestRequiredAcks().equals(config.getRequestRequiredAcks()) : config.getRequestRequiredAcks() == null;
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = getTopicId().hashCode();
-        result = 31 * result + (getBrokerList() != null ? getBrokerList().hashCode() : 0);
-        result = 31 * result + (getSerializerClass() != null ? getSerializerClass().hashCode() : 0);
-        result = 31 * result + (getKeySerializerClass() != null ? getKeySerializerClass().hashCode() : 0);
-        result = 31 * result + (getNumBatchMessages() != null ? getNumBatchMessages().hashCode() : 0);
-        result = 31 * result + (getMaxQueueBufferMs() != null ? getMaxQueueBufferMs().hashCode() : 0);
-        result = 31 * result + (getProducerType() != null ? getProducerType().hashCode() : 0);
-        result = 31 * result + (getRequestRequiredAcks() != null ? getRequestRequiredAcks().hashCode() : 0);
-        return result;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
deleted file mode 100644
index 5cc5145..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.engine.spout.SchemeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class KafkaStreamSource extends StormStreamSource<KafkaStreamSourceConfig> {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSource.class);
-    private KafkaSpout spout;
-
-    @Override
-    public void init(String streamId, KafkaStreamSourceConfig config) {
-        this.spout = createKafkaSpout(config);
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        this.spout.open(conf, context, collector);
-    }
-
-    @Override
-    public void nextTuple() {
-        this.spout.nextTuple();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        this.spout.declareOutputFields(declarer);
-    }
-
-    @Override
-    public void close() {
-        this.spout.close();
-    }
-
-    @Override
-    public void activate() {
-        this.spout.activate();
-    }
-
-    @Override
-    public void deactivate() {
-        this.spout.deactivate();
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        this.spout.ack(msgId);
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        this.spout.fail(msgId);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return this.spout.getComponentConfiguration();
-    }
-
-    // ----------------
-    //  Helper Methods
-    // ----------------
-
-    private static KafkaSpout createKafkaSpout(KafkaStreamSourceConfig config) {
-
-        // the following is for fetching data from one topic
-        // Kafka topic
-        String topic = config.getTopicId();
-        // Kafka broker zk connection
-        String zkConnString = config.getBrokerZkQuorum();
-        // Kafka fetch size
-        int fetchSize = config.getFetchSize();
-        LOG.info(String.format("Use topic : %s, zkQuorum : %s , fetchSize : %d", topic, zkConnString, fetchSize));
-
-        /*
-         the following is for recording offset for processing the data
-         the zk path to store current offset is comprised of the following
-         offset zkPath = zkRoot + "/" + topic + "/" + consumerGroupId + "/" + partition_Id
-
-         consumerGroupId is for differentiating different consumers which consume the same topic
-        */
-        // transaction zkRoot
-        String zkRoot = config.getTransactionZKRoot();
-        // Kafka consumer group id
-        String groupId = config.getConsumerGroupId();
-        String brokerZkPath = config.getBrokerZkPath();
-
-        BrokerHosts hosts;
-        if (StringUtils.isNotBlank(brokerZkPath)) {
-            hosts = new ZkHosts(zkConnString);
-        } else {
-            hosts = new ZkHosts(zkConnString, brokerZkPath);
-        }
-
-        SpoutConfig spoutConfig = new SpoutConfig(hosts,
-            topic,
-            zkRoot + "/" + topic,
-            groupId);
-
-        // transaction zkServers to store kafka consumer offset. Default to use storm zookeeper
-        if (StringUtils.isNotBlank(config.getTransactionZkServers())) {
-            String[] txZkServers = config.getTransactionZkServers().split(",");
-            spoutConfig.zkServers = Arrays.stream(txZkServers).map(server -> server.split(":")[0]).collect(Collectors.toList());
-            spoutConfig.zkPort = Integer.parseInt(txZkServers[0].split(":")[1]);
-            LOG.info("txZkServers:" + spoutConfig.zkServers + ", zkPort:" + spoutConfig.zkPort);
-        }
-
-        // transaction update interval
-        spoutConfig.stateUpdateIntervalMs = config.getTransactionStateUpdateMS();
-        // Kafka fetch size
-        spoutConfig.fetchSizeBytes = fetchSize;
-        spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
-
-        // "startOffsetTime" is for test usage, prod should not use this
-        if (config.getStartOffsetTime() >= 0) {
-            spoutConfig.startOffsetTime = config.getStartOffsetTime();
-        }
-        // "forceFromStart" is for test usage, prod should not use this
-        if (config.isForceFromStart()) {
-            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-        }
-
-        Preconditions.checkNotNull(config.getSchemaClass(), "schemaClass is null");
-        try {
-            Scheme s = config.getSchemaClass().newInstance();
-            spoutConfig.scheme = new SchemeAsMultiScheme(s);
-        } catch (Exception ex) {
-            LOG.error("Error instantiating scheme object");
-            throw new IllegalStateException(ex);
-        }
-        return new KafkaSpout(spoutConfig);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
deleted file mode 100644
index d0a91da..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.eagle.metadata.model.StreamSourceConfig;
-
-public class KafkaStreamSourceConfig implements StreamSourceConfig {
-    private static final String DEFAULT_CONSUMER_GROUP_ID = "eagleKafkaSource";
-    private static final String DEFAULT_TRANSACTION_ZK_ROOT = "/consumers";
-    private static final Class<? extends backtype.storm.spout.Scheme> DEFAULT_KAFKA_SCHEMA = JsonSchema.class;
-
-    // Read Config
-    private String topicId;
-    private String brokerZkQuorum;
-    private String brokerZkBasePath;
-    private String transactionZkServers;
-
-    private int fetchSize = 1048576;
-    private String transactionZKRoot = DEFAULT_TRANSACTION_ZK_ROOT;
-    private String consumerGroupId = DEFAULT_CONSUMER_GROUP_ID;
-    private String brokerZkPath = "/brokers";
-    private long transactionStateUpdateMS = 2000;
-    private int startOffsetTime = -1;
-    private boolean forceFromStart = false;
-    private Class<? extends backtype.storm.spout.Scheme> schemaClass = DEFAULT_KAFKA_SCHEMA;
-
-    public String getBrokerZkQuorum() {
-        return brokerZkQuorum;
-    }
-
-    public void setBrokerZkQuorum(String brokerZkQuorum) {
-        this.brokerZkQuorum = brokerZkQuorum;
-    }
-
-    public String getBrokerZkBasePath() {
-        return brokerZkBasePath;
-    }
-
-    public void setBrokerZkBasePath(String brokerZkBasePath) {
-        this.brokerZkBasePath = brokerZkBasePath;
-    }
-
-    public int getFetchSize() {
-        return fetchSize;
-    }
-
-    public void setFetchSize(int fetchSize) {
-        this.fetchSize = fetchSize;
-    }
-
-    public String getTransactionZKRoot() {
-        return transactionZKRoot;
-    }
-
-    public void setTransactionZKRoot(String transactionZKRoot) {
-        this.transactionZKRoot = transactionZKRoot;
-    }
-
-    public String getConsumerGroupId() {
-        return consumerGroupId;
-    }
-
-    public void setConsumerGroupId(String consumerGroupId) {
-        this.consumerGroupId = consumerGroupId;
-    }
-
-    public String getBrokerZkPath() {
-        return brokerZkPath;
-    }
-
-    public void setBrokerZkPath(String brokerZkPath) {
-        this.brokerZkPath = brokerZkPath;
-    }
-
-    public String getTransactionZkServers() {
-        return transactionZkServers;
-    }
-
-    public void setTransactionZkServers(String transactionZkServers) {
-        this.transactionZkServers = transactionZkServers;
-    }
-
-    public long getTransactionStateUpdateMS() {
-        return transactionStateUpdateMS;
-    }
-
-    public void setTransactionStateUpdateMS(long transactionStateUpdateMS) {
-        this.transactionStateUpdateMS = transactionStateUpdateMS;
-    }
-
-    public int getStartOffsetTime() {
-        return startOffsetTime;
-    }
-
-    public void setStartOffsetTime(int startOffsetTime) {
-        this.startOffsetTime = startOffsetTime;
-    }
-
-    public boolean isForceFromStart() {
-        return forceFromStart;
-    }
-
-    public void setForceFromStart(boolean forceFromStart) {
-        this.forceFromStart = forceFromStart;
-    }
-
-    public Class<? extends backtype.storm.spout.Scheme> getSchemaClass() {
-        return schemaClass;
-    }
-
-    public void setSchemaClass(Class<? extends backtype.storm.spout.Scheme> schemaClass) {
-        this.schemaClass = schemaClass;
-    }
-
-    @Override
-    public String getType() {
-        return "KAFKA";
-    }
-
-    @Override
-    public Class<?> getSourceType() {
-        return KafkaStreamSource.class;
-    }
-
-    @Override
-    public Class<? extends StreamSourceConfig> getConfigType() {
-        return StreamSourceConfig.class;
-    }
-
-    public String getTopicId() {
-        return topicId;
-    }
-
-    public void setTopicId(String topicId) {
-        this.topicId = topicId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof KafkaStreamSourceConfig)) {
-            return false;
-        }
-
-        KafkaStreamSourceConfig that = (KafkaStreamSourceConfig) o;
-
-        if (getFetchSize() != that.getFetchSize()) {
-            return false;
-        }
-        if (getTransactionStateUpdateMS() != that.getTransactionStateUpdateMS()) {
-            return false;
-        }
-        if (getStartOffsetTime() != that.getStartOffsetTime()) {
-            return false;
-        }
-        if (isForceFromStart() != that.isForceFromStart()) {
-            return false;
-        }
-        if (getTopicId() != null ? !getTopicId().equals(that.getTopicId()) : that.getTopicId() != null) {
-            return false;
-        }
-        if (getBrokerZkQuorum() != null ? !getBrokerZkQuorum().equals(that.getBrokerZkQuorum()) : that.getBrokerZkQuorum() != null) {
-            return false;
-        }
-        if (getBrokerZkBasePath() != null ? !getBrokerZkBasePath().equals(that.getBrokerZkBasePath()) : that.getBrokerZkBasePath() != null) {
-            return false;
-        }
-        if (getTransactionZkServers() != null ? !getTransactionZkServers().equals(that.getTransactionZkServers()) : that.getTransactionZkServers() != null) {
-            return false;
-        }
-        if (getTransactionZKRoot() != null ? !getTransactionZKRoot().equals(that.getTransactionZKRoot()) : that.getTransactionZKRoot() != null) {
-            return false;
-        }
-        if (getConsumerGroupId() != null ? !getConsumerGroupId().equals(that.getConsumerGroupId()) : that.getConsumerGroupId() != null) {
-            return false;
-        }
-        if (getBrokerZkPath() != null ? !getBrokerZkPath().equals(that.getBrokerZkPath()) : that.getBrokerZkPath() != null) {
-            return false;
-        }
-        return getSchemaClass() != null ? getSchemaClass().equals(that.getSchemaClass()) : that.getSchemaClass() == null;
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = getTopicId() != null ? getTopicId().hashCode() : 0;
-        result = 31 * result + (getBrokerZkQuorum() != null ? getBrokerZkQuorum().hashCode() : 0);
-        result = 31 * result + (getBrokerZkBasePath() != null ? getBrokerZkBasePath().hashCode() : 0);
-        result = 31 * result + (getTransactionZkServers() != null ? getTransactionZkServers().hashCode() : 0);
-        result = 31 * result + getFetchSize();
-        result = 31 * result + (getTransactionZKRoot() != null ? getTransactionZKRoot().hashCode() : 0);
-        result = 31 * result + (getConsumerGroupId() != null ? getConsumerGroupId().hashCode() : 0);
-        result = 31 * result + (getBrokerZkPath() != null ? getBrokerZkPath().hashCode() : 0);
-        result = 31 * result + (int) (getTransactionStateUpdateMS() ^ (getTransactionStateUpdateMS() >>> 32));
-        result = 31 * result + getStartOffsetTime();
-        result = 31 * result + (isForceFromStart() ? 1 : 0);
-        result = 31 * result + (getSchemaClass() != null ? getSchemaClass().hashCode() : 0);
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
deleted file mode 100644
index 90e6481..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metadata.model.MetricSchemaEntity;
-import org.apache.eagle.app.environment.builder.MetricDescriptor;
-import org.apache.eagle.service.client.EagleServiceClientException;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class MetricSchemaGenerator extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(MetricSchemaGenerator.class);
-    private static int MAX_CACHE_LENGTH = 1000;
-    public static final String GENERIC_METRIC_VALUE_NAME = "value";
-
-    private final HashSet<String> metricNameCache = new HashSet<>(MAX_CACHE_LENGTH);
-    private final MetricDescriptor metricDescriptor;
-    private final Config config;
-
-    private OutputCollector collector;
-    private IEagleServiceClient client;
-
-    public MetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) {
-        this.metricDescriptor = metricDescriptor;
-        this.config = config;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-        this.client = new EagleServiceClientImpl(config);
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        try {
-            String metricName = input.getStringByField(MetricStreamPersist.METRIC_NAME_FIELD);
-            synchronized (metricNameCache) {
-                if (!metricNameCache.contains(metricName)) {
-                    createMetricSchemaEntity(metricName, (Map) input.getValueByField(MetricStreamPersist.METRIC_EVENT_FIELD),this.metricDescriptor);
-                    metricNameCache.add(metricName);
-                }
-                if (metricNameCache.size() > MAX_CACHE_LENGTH) {
-                    this.metricNameCache.clear();
-                }
-            }
-            this.collector.ack(input);
-        } catch (Throwable throwable) {
-            LOG.warn(throwable.getMessage(), throwable);
-            this.collector.reportError(throwable);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-
-    @Override
-    public void cleanup() {
-        if (this.client != null) {
-            try {
-                this.client.close();
-            } catch (IOException e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-    }
-
-    private void createMetricSchemaEntity(String metricName, Map event, MetricDescriptor metricDescriptor) throws IOException, EagleServiceClientException {
-        MetricSchemaEntity schemaEntity = new MetricSchemaEntity();
-        Map<String, String> schemaTags = new HashMap<>();
-        schemaEntity.setTags(schemaTags);
-        schemaTags.put(MetricSchemaEntity.METRIC_SITE_TAG, metricDescriptor.getSiteIdSelector().getSiteId(event));
-        schemaTags.put(MetricSchemaEntity.METRIC_NAME_TAG, metricName);
-        schemaTags.put(MetricSchemaEntity.METRIC_GROUP_TAG, metricDescriptor.getMetricGroupSelector().getMetricGroup(event));
-        schemaEntity.setGranularityByField(metricDescriptor.getGranularity());
-        schemaEntity.setDimensionFields(metricDescriptor.getDimensionFields());
-        schemaEntity.setMetricFields(Collections.singletonList(GENERIC_METRIC_VALUE_NAME));
-        schemaEntity.setModifiedTimestamp(System.currentTimeMillis());
-        GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(schemaEntity));
-        if (response.isSuccess()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Created {}", schemaEntity);
-            }
-        } else {
-            LOG.error("Failed to create {}", schemaEntity, response.getException());
-            throw new IOException("Service error: " + response.getException());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
deleted file mode 100644
index c9b43e5..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.builder.MetricDescriptor;
-import org.apache.eagle.app.utils.StreamConvertHelper;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.BatchSender;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MetricStreamPersist extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class);
-    public static final String METRIC_NAME_FIELD = "metricName";
-    public static final String METRIC_EVENT_FIELD = "metricEvent";
-
-    private final Config config;
-    private final MetricMapper mapper;
-    private final int batchSize;
-    private IEagleServiceClient client;
-    private OutputCollector collector;
-    private BatchSender batchSender;
-
-    public MetricStreamPersist(MetricDescriptor metricDescriptor, Config config) {
-        this.config = config;
-        this.mapper = new StructuredMetricMapper(metricDescriptor);
-        this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
-    }
-
-    public MetricStreamPersist(MetricMapper mapper, Config config) {
-        this.config = config;
-        this.mapper = mapper;
-        this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.client = new EagleServiceClientImpl(config);
-        if (this.batchSize > 0) {
-            this.batchSender = client.batch(this.batchSize);
-        }
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        GenericMetricEntity metricEntity = null;
-        Map event = null;
-        try {
-            event = StreamConvertHelper.tupleToEvent(input).f1();
-            metricEntity = this.mapper.map(event);
-            if (batchSize <= 1) {
-                GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity));
-                if (!response.isSuccess()) {
-                    LOG.error("Service side error: {}", response.getException());
-                    collector.reportError(new IllegalStateException(response.getException()));
-                }
-            } else {
-                this.batchSender.send(metricEntity);
-            }
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-            collector.reportError(ex);
-        } finally {
-            if (metricEntity != null && event != null) {
-                collector.emit(Arrays.asList(metricEntity.getPrefix(), event));
-            }
-            collector.ack(input);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(METRIC_NAME_FIELD, METRIC_EVENT_FIELD));
-    }
-
-    @Override
-    public void cleanup() {
-        try {
-            this.client.close();
-        } catch (IOException e) {
-            LOG.error("Close client error: {}", e.getMessage(), e);
-        } finally {
-            super.cleanup();
-        }
-    }
-
-    @FunctionalInterface
-    public interface MetricMapper extends Serializable {
-        GenericMetricEntity map(Map event);
-    }
-
-    public class StructuredMetricMapper implements MetricMapper {
-        private final MetricDescriptor metricDescriptor;
-
-        private StructuredMetricMapper(MetricDescriptor metricDescriptor) {
-            this.metricDescriptor = metricDescriptor;
-        }
-
-        @Override
-        public GenericMetricEntity map(Map event) {
-            String metricName = metricDescriptor.getMetricNameSelector().getMetricName(event);
-            Preconditions.checkNotNull(metricName, "Metric name is null");
-            Long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event);
-            Preconditions.checkNotNull(timestamp, "Timestamp is null");
-            Map<String, String> tags = new HashMap<>();
-            for (String dimensionField : metricDescriptor.getDimensionFields()) {
-                Preconditions.checkNotNull(dimensionField, "Dimension field name is null");
-                tags.put(dimensionField, (String) event.get(dimensionField));
-            }
-
-            double[] values;
-            if (event.containsKey(metricDescriptor.getValueField())) {
-                values = new double[] {(double) event.get(metricDescriptor.getValueField())};
-            } else {
-                LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event);
-                values = new double[] {0};
-            }
-
-            GenericMetricEntity entity = new GenericMetricEntity();
-            entity.setPrefix(metricName);
-            entity.setTimestamp(DateTimeUtil.roundDown(metricDescriptor.getGranularity(), timestamp));
-            entity.setTags(tags);
-            entity.setValue(values);
-            return entity;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
deleted file mode 100644
index ef6c8d6..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.eagle.app.utils.StreamConvertHelper;
-import org.apache.eagle.common.utils.Tuple2;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseRichBolt implements StreamSink<K> {
-    private static final Logger LOG = LoggerFactory.getLogger(StormStreamSink.class);
-    private String streamId;
-    private OutputCollector collector;
-
-    @Override
-    public void init(String streamId, K config) {
-        this.streamId = streamId;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    /**
-     * Implicitly hides the Tuple protocol inside code as Tuple[Key,Map].
-     */
-    @Override
-    public void execute(Tuple input) {
-        try {
-            Tuple2<Object,Map> keyValue = StreamConvertHelper.tupleToEvent(input);
-            execute(keyValue.f0(), keyValue.f1(), collector);
-            collector.ack(input);
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-            collector.reportError(ex);
-        }
-    }
-
-    protected abstract void execute(Object key, Map event, OutputCollector collector) throws Exception;
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
deleted file mode 100644
index b31de46..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.topology.base.BaseRichSpout;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-import org.apache.eagle.metadata.model.StreamSourceConfig;
-
-public abstract class StormStreamSource<T extends StreamSourceConfig> extends BaseRichSpout implements StreamSource<T> {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
deleted file mode 100644
index 42ea36e..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import backtype.storm.tuple.Tuple;
-
-import java.io.Serializable;
-import java.util.List;
-
-@FunctionalInterface
-public interface StreamEventMapper extends Serializable {
-    /**
-     * Map from storm tuple to Stream Event.
-     *
-     * @param tuple
-     * @return
-     * @throws Exception
-     */
-    List<StreamEvent> map(Tuple tuple) throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
deleted file mode 100644
index 0dbc1a7..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-import org.apache.eagle.metadata.model.StreamSourceConfig;
-
-import java.lang.reflect.ParameterizedType;
-
-/**
- * Stream Messaging Bus.
- */
-public interface StreamProvider<W extends StreamSink<C>, C extends StreamSinkConfig,
-        R extends StreamSource<F>, F extends StreamSourceConfig> {
-
-    C getSinkConfig(String streamId, Config config);
-
-    W getSink();
-
-    default W getSink(String streamId, Config config) {
-        W s = getSink();
-        s.init(streamId, getSinkConfig(streamId, config));
-        return s;
-    }
-
-    F getSourceConfig(String streamId, Config config);
-
-    R getSource();
-
-    default R getSource(String streamId, Config config) {
-        R i = getSource();
-        i.init(streamId, getSourceConfig(streamId, config));
-        return i;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java
deleted file mode 100644
index f76cdbf..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-public class StreamRecord extends HashMap<String,Object> implements Serializable {
-    public StreamRecord() {
-    }
-
-    public StreamRecord(Map<String,Object> event) {
-        this.putAll(event);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
deleted file mode 100644
index 7ba4a9a..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.eagle.app.ApplicationLifecycle;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public interface StreamSink<T extends StreamSinkConfig> extends ApplicationLifecycle {
-    void init(String streamId,T config);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
deleted file mode 100644
index af3965f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.eagle.metadata.model.StreamSourceConfig;
-
-public interface StreamSource<T extends StreamSourceConfig> {
-    void init(String streamId, T config);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationExtensionLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationExtensionLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationExtensionLoader.java
deleted file mode 100644
index a74b6f2..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationExtensionLoader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.module;
-
-import org.apache.eagle.app.service.ApplicationProviderService;
-import org.apache.eagle.common.module.ModuleRegistry;
-import org.apache.eagle.common.module.ModuleRegistryImpl;
-import com.google.inject.Guice;
-import com.google.inject.Module;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ApplicationExtensionLoader {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationExtensionLoader.class);
-
-    public static ModuleRegistry load(Module... context) {
-        LOGGER.warn("Loading application extension modules");
-        ModuleRegistry registry = new ModuleRegistryImpl();
-        Guice.createInjector(context).getInstance(ApplicationProviderService.class).getProviders().forEach((provider) -> {
-            LOGGER.warn("Registering modules from {}", provider);
-            provider.register(registry);
-        });
-        return registry;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationGuiceModule.java
deleted file mode 100644
index 6c8c310..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationGuiceModule.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.module;
-
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.app.service.ApplicationHealthCheckService;
-import org.apache.eagle.app.service.ApplicationManagementService;
-import org.apache.eagle.app.service.ApplicationProviderService;
-import org.apache.eagle.app.service.impl.ApplicationHealthCheckServiceImpl;
-import org.apache.eagle.app.service.impl.ApplicationManagementServiceImpl;
-import org.apache.eagle.app.service.impl.ApplicationProviderServiceImpl;
-import org.apache.eagle.app.service.impl.ApplicationStatusUpdateServiceImpl;
-import org.apache.eagle.metadata.service.ApplicationDescService;
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import com.google.inject.util.Providers;
-import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
-
-public class ApplicationGuiceModule extends AbstractModule {
-    private final ApplicationProviderService appProviderInst;
-
-    public ApplicationGuiceModule(ApplicationProviderService appProviderInst) {
-        this.appProviderInst = appProviderInst;
-    }
-
-    public ApplicationGuiceModule() {
-        this.appProviderInst = new ApplicationProviderServiceImpl(ConfigFactory.load());
-    }
-
-    @Override
-    protected void configure() {
-        bind(ApplicationProviderService.class).toProvider(Providers.of(appProviderInst));
-        bind(ApplicationDescService.class).toProvider(Providers.of(appProviderInst));
-        bind(ApplicationManagementService.class).to(ApplicationManagementServiceImpl.class).in(Singleton.class);
-        bind(ApplicationStatusUpdateService.class).to(ApplicationStatusUpdateServiceImpl.class).in(Singleton.class);
-        bind(ApplicationHealthCheckService.class).to(ApplicationHealthCheckServiceImpl.class).in(Singleton.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
deleted file mode 100644
index b91a70c..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- * <h1>Application Management Framework Interfaces</h1>
- *
- * <ul>
- *     <li>Application Context (Runtime): org.apache.eagle.app.service.ApplicationOperationContext</li>
- *     <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li>
- *     <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li>
- *     <li>Application Lifecycle Listener (Callback): org.apache.eagle.app.ApplicationLifecycle</li>
- * </ul>
- */
-package org.apache.eagle.app;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
deleted file mode 100644
index 3c62367..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.resource;
-
-import org.apache.eagle.app.service.ApplicationManagementService;
-import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.service.ApplicationProviderService;
-import org.apache.eagle.common.rest.RESTResponse;
-import org.apache.eagle.metadata.model.ApplicationDesc;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.service.ApplicationEntityService;
-import com.google.inject.Inject;
-
-import java.util.Collection;
-import javax.ws.rs.*;
-import javax.ws.rs.core.MediaType;
-
-@Path("/apps")
-public class ApplicationResource {
-    private final ApplicationProviderService providerService;
-    private final ApplicationManagementService applicationManagementService;
-    private final ApplicationEntityService entityService;
-
-    @Inject
-    public ApplicationResource(
-        ApplicationProviderService providerService,
-        ApplicationManagementService applicationManagementService,
-        ApplicationEntityService entityService) {
-        this.providerService = providerService;
-        this.applicationManagementService = applicationManagementService;
-        this.entityService = entityService;
-    }
-
-    @GET
-    @Path("/providers")
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Collection<ApplicationDesc>> getApplicationDescs() {
-        return RESTResponse.async(providerService::getApplicationDescs).get();
-    }
-
-    @GET
-    @Path("/providers/{type}")
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<ApplicationDesc> getApplicationDescByType(@PathParam("type") String type) {
-        return RESTResponse.async(() -> providerService.getApplicationDescByType(type)).get();
-    }
-
-    @PUT
-    @Path("/providers/reload")
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Collection<ApplicationDesc>> reloadApplicationDescs() {
-        return RESTResponse.<Collection<ApplicationDesc>>async((response) -> {
-            providerService.reload();
-            response.message("Successfully reload application providers");
-            response.data(providerService.getApplicationDescs());
-        }).get();
-    }
-
-    @GET
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Collection<ApplicationEntity>> getApplicationEntities(@QueryParam("siteId") String siteId) {
-        return RESTResponse.async(() -> {
-            if (siteId == null) {
-                return entityService.findAll();
-            } else {
-                return entityService.findBySiteId(siteId);
-            }
-        }).get();
-    }
-
-    @GET
-    @Path("/{appUuid}")
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<ApplicationEntity> getApplicationEntityByUUID(@PathParam("appUuid") String appUuid) {
-        return RESTResponse.async(() -> entityService.getByUUID(appUuid)).get();
-    }
-
-    @POST
-    @Path("/{appUuid}")
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<ApplicationEntity> updateApplicationEntity(@PathParam("appUuid") String appUuid, ApplicationOperations.UpdateOperation updateOperation) {
-        return RESTResponse.async(() -> {
-            ApplicationEntity applicationEntity = new ApplicationEntity();
-            applicationEntity.setStatus(entityService.getByUUID(appUuid).getStatus());
-            applicationEntity.setUuid(appUuid);
-            applicationEntity.setJarPath(updateOperation.getJarPath());
-            applicationEntity.setMode(updateOperation.getMode());
-            applicationEntity.setConfiguration(updateOperation.getConfiguration());
-            return entityService.update(applicationEntity);
-        }).get();
-    }
-
-    @POST
-    @Path("/status")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<ApplicationEntity.Status> checkApplicationStatusByUUID(ApplicationOperations.CheckStatusOperation operation) {
-        return RESTResponse.<ApplicationEntity.Status>async((response) -> {
-            ApplicationEntity.Status status = (entityService.getByUUIDOrAppId(null, operation.getAppId())).getStatus();
-            response.success(true).message("Successfully fetched application status");
-            response.data(status);
-        }).get();
-    }
-
-    /**
-     * <b>Request:</b>
-     * <pre>
-     * {
-     *      uuid: APPLICATION_UUID
-     * }
-     * </pre>.
-     */
-    @POST
-    @Path("/install")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<ApplicationEntity> installApplication(ApplicationOperations.InstallOperation operation) {
-        return RESTResponse.<ApplicationEntity>async((response) -> {
-            ApplicationEntity entity = applicationManagementService.install(operation);
-            response.message("Successfully installed application " + operation.getAppType() + " onto site " + operation.getSiteId());
-            response.data(entity);
-        }).get();
-    }
-
-    /**
-     * <b>Request:</b>
-     * <pre>
-     * {
-     *      uuid: APPLICATION_UUID
-     * }
-     * </pre>.
-     *
-     * @param operation
-     */
-    @DELETE
-    @Path("/uninstall")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Void> uninstallApplication(ApplicationOperations.UninstallOperation operation) {
-        return RESTResponse.<Void>async((response) -> {
-            ApplicationEntity entity = applicationManagementService.uninstall(operation);
-            response.success(true).message("Successfully uninstalled application " + entity.getUuid());
-        }).get();
-    }
-
-    /**
-     * <b>Request:</b>
-     * <pre>
-     * {
-     *      uuid: APPLICATION_UUID
-     * }
-     * </pre>
-     * operation.
-     * @param operation
-     */
-    @POST
-    @Path("/start")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Void> startApplication(ApplicationOperations.StartOperation operation) {
-        return RESTResponse.<Void>async((response) -> {
-            ApplicationEntity entity = applicationManagementService.start(operation);
-            response.success(true).message("Starting application " + entity.getUuid());
-        }).get();
-    }
-
-    /**
-     * <b>Request:</b>
-     * <pre>
-     * {
-     *      uuid: APPLICATION_UUID
-     * }
-     * </pre>.
-     *
-     * @param operation
-     */
-    @POST
-    @Path("/stop")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Void> stopApplication(ApplicationOperations.StopOperation operation) {
-        return RESTResponse.<Void>async((response) -> {
-            ApplicationEntity entity = applicationManagementService.stop(operation);
-            response.success(true).message("Stopping application " + entity.getUuid());
-        }).get();
-    }
-
-}
\ No newline at end of file


Mime
View raw message