eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/3] incubator-eagle git commit: [EAGLE-402] Refactor Application Framework to be better managed or deployed standalone
Date Wed, 03 Aug 2016 15:25:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkDesc.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkDesc.java
deleted file mode 100644
index 05aa8c0..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkDesc.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.metadata.model.StreamSinkDesc;
-
-public class DefaultStreamSinkDesc implements StreamSinkDesc {
-    private final Class<?> streamPersistClass;
-    private final static String NONE_STORAGE_TYPE = "NONE";
-
-    public DefaultStreamSinkDesc(Class<?> streamPersistClass){
-        this.streamPersistClass = streamPersistClass;
-    }
-
-    @Override
-    public String getType() {
-        return NONE_STORAGE_TYPE;
-    }
-
-    public Class<?> getSinkClass() {
-        return streamPersistClass;
-    }
-
-    @Override
-    public Class<? extends StreamSinkDesc> getDescClass() {
-        return DefaultStreamSinkDesc.class;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
new file mode 100644
index 0000000..fe3562b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FlattenEventMapper  implements StreamEventMapper{
+    private final String streamId;
+    private final static String TIMESTAMP_FIELD = "timestamp";
+    private final static Logger LOGGER = LoggerFactory.getLogger(FlattenEventMapper.class);
+
+    public FlattenEventMapper(String streamId){
+        this.streamId = streamId;
+    }
+
+    @Override
+    public List<StreamEvent> map(Tuple tuple) throws Exception {
+        long timestamp;
+        if(tuple.getFields().contains(TIMESTAMP_FIELD)) {
+            try {
+                timestamp = tuple.getLongByField("timestamp");
+            } catch (Exception ex) {
+                // if timestamp is not null
+                LOGGER.error(ex.getMessage(),ex);
+                timestamp = 0;
+            }
+        } else {
+            timestamp = System.currentTimeMillis();
+        }
+        Object[] values = new Object[tuple.getFields().size()];
+        for(int i=0;i<tuple.getFields().size();i++){
+            values[i] = tuple.getValue(i);
+        }
+        StreamEvent event = new StreamEvent();
+        event.setTimestamp(timestamp);
+        event.setStreamId(streamId);
+        event.setData(values);
+        return Collections.singletonList(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index 5f8c176..dda58d6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,26 +17,21 @@
 package org.apache.eagle.app.sink;
 
 import backtype.storm.task.TopologyContext;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.app.ApplicationContext;
+import org.apache.eagle.app.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-public class KafkaStreamSink extends AbstractStreamSink<KafkaStreamSinkDesc> {
+public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
     private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
     private String topicId;
 
     @Override
-    public KafkaStreamSinkDesc init(StreamDefinition streamDefinition, ApplicationContext context) {
-        this.topicId = String.format("EAGLE.%s.%s",
-                context.getAppEntity().getSite().getSiteId(),
-                streamDefinition.getStreamId()).toLowerCase();
-        KafkaStreamSinkDesc desc = new KafkaStreamSinkDesc();
-        desc.setTopicId(topicId);
-        return desc;
+    public void init(String streamId, KafkaStreamSinkConfig config) {
+        super.init(streamId, config);
+        this.topicId = config.getTopicId();
     }
 
     @Override
@@ -50,17 +45,8 @@ public class KafkaStreamSink extends AbstractStreamSink<KafkaStreamSinkDesc> {
         LOGGER.info("TODO: producing {} to '{}'",streamEvent,topicId);
     }
 
-//    @Override
-//    public Map<String, Object> getSink() {
-//        return new HashMap<String,Object>(){
-//            {
-//                put("kafka.topic",KafkaStreamSink.this.topicId);
-//            }
-//        };
-//    }
-
     @Override
-    public void onAppInstall() {
+    public void onInstall() {
         ensureTopicCreated();
     }
 
@@ -73,7 +59,24 @@ public class KafkaStreamSink extends AbstractStreamSink<KafkaStreamSinkDesc> {
     }
 
     @Override
-    public void onAppUninstall() {
+    public void onUninstall() {
         ensureTopicDeleted();
     }
+
+    public static class Provider implements StreamSinkProvider<KafkaStreamSink,KafkaStreamSinkConfig> {
+        @Override
+        public KafkaStreamSinkConfig getSinkConfig(String streamId, Configuration appConfig) {
+            String topicId = String.format("EAGLE.%s.%s",
+                    appConfig.getSiteId(),
+                    streamId).toLowerCase();
+            KafkaStreamSinkConfig desc = new KafkaStreamSinkConfig();
+            desc.setTopicId(topicId);
+            return desc;
+        }
+
+        @Override
+        public KafkaStreamSink getSink() {
+            return new KafkaStreamSink();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
new file mode 100644
index 0000000..17a3aa8
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+public class KafkaStreamSinkConfig implements StreamSinkConfig {
+    private String topicId;
+
+    public String getTopicId() {
+        return topicId;
+    }
+
+    public void setTopicId(String topicId) {
+        this.topicId = topicId;
+    }
+
+    @Override
+    public String getType() {
+        return "KAFKA";
+    }
+
+    @Override
+    public Class<?> getSinkType() {
+        return KafkaStreamSink.class;
+    }
+
+    @Override
+    public Class<? extends StreamSinkConfig> getConfigType() {
+        return KafkaStreamSinkConfig.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkDesc.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkDesc.java
deleted file mode 100644
index 440f67c..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkDesc.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.metadata.model.StreamSinkDesc;
-
-public class KafkaStreamSinkDesc implements StreamSinkDesc {
-    private String topicId;
-
-    public String getTopicId() {
-        return topicId;
-    }
-
-    public void setTopicId(String topicId) {
-        this.topicId = topicId;
-    }
-
-    @Override
-    public String getType() {
-        return "KAFKA";
-    }
-
-    @Override
-    public Class<?> getSinkClass() {
-        return KafkaStreamSink.class;
-    }
-
-    @Override
-    public Class<? extends StreamSinkDesc> getDescClass() {
-        return KafkaStreamSinkDesc.class;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index 64c019e..3efd811 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -18,11 +18,11 @@ package org.apache.eagle.app.sink;
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.app.ApplicationContext;
+import org.apache.eagle.app.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class LoggingStreamSink extends AbstractStreamSink<DefaultStreamSinkDesc> {
+public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig> {
     private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
 
     @Override
@@ -31,17 +31,25 @@ public class LoggingStreamSink extends AbstractStreamSink<DefaultStreamSinkDesc>
     }
 
     @Override
-    public void onAppInstall() {
-        LOGGER.info("Executing onAppInstall callback, do nothing");
+    public void onInstall() {
+        LOGGER.info("Executing onInstall callback, do nothing");
     }
 
     @Override
-    public void onAppUninstall() {
-        LOGGER.info("Executing onAppUninstall callback, do nothing");
+    public void onUninstall() {
+        LOGGER.info("Executing onUninstall callback, do nothing");
     }
 
-    @Override
-    public DefaultStreamSinkDesc init(StreamDefinition streamDefinition, ApplicationContext context) {
-        return new DefaultStreamSinkDesc(LoggingStreamSink.class);
+
+    public static class Provider implements StreamSinkProvider<LoggingStreamSink,DefaultStreamSinkConfig> {
+        @Override
+        public DefaultStreamSinkConfig getSinkConfig(String streamId, Configuration appConfig) {
+            return new DefaultStreamSinkConfig(LoggingStreamSink.class);
+        }
+
+        @Override
+        public LoggingStreamSink getSink() {
+            return new LoggingStreamSink();
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
new file mode 100644
index 0000000..1b35a99
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseBasicBolt implements StreamSink<K> {
+    private final static Logger LOG = LoggerFactory.getLogger(StormStreamSink.class);
+    private final static String KEY_FIELD = "KEY";
+    final static String VALUE_FIELD = "VALUE";
+    private StreamEventMapper streamEventMapper = null;
+    private String streamId;
+
+    @Override
+    public void init(String streamId, K config) {
+        this.streamId = streamId;
+        this.streamEventMapper = new FlattenEventMapper(streamId);
+    }
+
+    public StormStreamSink<K> setEventMapper(StreamEventMapper eventMapper){
+        this.streamEventMapper = eventMapper;
+        return this;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context) {
+        super.prepare(stormConf, context);
+        Preconditions.checkNotNull(this.streamEventMapper);
+    }
+
+    @Override
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        try {
+            List<StreamEvent> streamEvents = this.streamEventMapper.map(input);
+            if(streamEvents!=null) {
+                streamEvents.forEach((streamEvent -> {
+                    try {
+                        onEvent(streamEvent);
+                    } catch (Exception e) {
+                        LOG.error("Failed to execute event {}", streamEvent);
+                        collector.reportError(e);
+                    }
+                }));
+            }
+        } catch (Exception e) {
+                LOG.error("Failed to execute event {}",input);
+                collector.reportError(e);
+        }
+    }
+
+    protected abstract void onEvent(StreamEvent streamEvent);
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(KEY_FIELD,VALUE_FIELD));
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
new file mode 100644
index 0000000..350e274
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.io.Serializable;
+import java.util.List;
+
+@FunctionalInterface
+public interface StreamEventMapper extends Serializable{
+    /**
+     * @param tuple
+     * @return
+     * @throws Exception
+     */
+    List<StreamEvent> map(Tuple tuple) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
index e98e8bd..ee2790c 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
@@ -16,31 +16,9 @@
  */
 package org.apache.eagle.app.sink;
 
-import backtype.storm.topology.base.BaseBasicBolt;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.app.ApplicationContext;
-import org.apache.eagle.app.ApplicationLifecycleListener;
-import org.apache.eagle.app.sink.mapper.StreamEventMapper;
-import org.apache.eagle.metadata.model.StreamSinkDesc;
+import org.apache.eagle.app.ApplicationLifecycle;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
 
-public abstract class StreamSink<T extends StreamSinkDesc> extends BaseBasicBolt implements ApplicationLifecycleListener {
-    /**
-     * Should only initialize metadata in this method but must not open any resource like connection
-     *
-     * @param streamDefinition
-     * @param context
-     */
-    public abstract T init(StreamDefinition streamDefinition, ApplicationContext context);
-
-    public abstract StreamSink<T> setEventMapper(StreamEventMapper streamEventMapper);
-
-    @Override
-    public void onAppStart() {
-        // StreamSink by default will do nothing when application start
-    }
-
-    @Override
-    public void onAppStop() {
-        // StreamSink by default will do nothing when application start
-    }
+public interface StreamSink<T extends StreamSinkConfig> extends ApplicationLifecycle {
+    void init(String streamId,T config);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
new file mode 100644
index 0000000..7c64e50
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+import java.lang.reflect.ParameterizedType;
+
+public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
+    /**
+     * @param streamId
+     * @param appConfig
+     * @return
+     */
+    D getSinkConfig(String streamId, Configuration appConfig);
+    S getSink();
+
+    default S getSink(String streamId, Configuration appConfig){
+        S s = getSink();
+        s.init(streamId,getSinkConfig(streamId,appConfig));
+        return s;
+    }
+
+    default Class<? extends S> getSinkType(){
+        return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+    }
+
+    default Class<? extends D> getSinkConfigType(){
+        return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
deleted file mode 100644
index 631e02a..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink.mapper;
-
-public interface DirectEventMapper extends StreamEventMapper{}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
deleted file mode 100644
index aa60414..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink.mapper;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class FieldIndexDirectEventMapper implements DirectEventMapper {
-    private final int[] fieldIndexs;
-
-    public FieldIndexDirectEventMapper(int ... fieldIndexs){
-        this.fieldIndexs = fieldIndexs;
-    }
-
-    @Override
-    public List<StreamEvent> map(Tuple tuple) throws Exception {
-        List<StreamEvent> events = new ArrayList<>(fieldIndexs.length);
-        for(int index:fieldIndexs){
-            events.add((StreamEvent) tuple.getValue(index));
-        }
-        return events;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
deleted file mode 100644
index bbb4a11..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink.mapper;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class FieldNameDirectEventMapper implements DirectEventMapper {
-    private final String[] fieldNames;
-
-    public FieldNameDirectEventMapper(String ... fieldNames){
-        this.fieldNames = fieldNames;
-    }
-
-    @Override
-    public List<StreamEvent> map(Tuple tuple) throws Exception {
-        List<StreamEvent> events = new ArrayList<>(fieldNames.length);
-        for(String fieldName:fieldNames){
-            events.add((StreamEvent) tuple.getValueByField(fieldName));
-        }
-        return events;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
deleted file mode 100644
index cb8ee33..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink.mapper;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class FlattenEventMapper implements StreamEventMapper {
-
-    private final StreamDefinition streamDefinition;
-    private final TimestampSelector timestampSelector;
-
-    private final static String DEFAULT_TIMESTAMP_COLUMN_NAME = "timestamp";
-    private final static Logger LOG = LoggerFactory.getLogger(FlattenEventMapper.class);
-
-    public FlattenEventMapper(StreamDefinition streamDefinition, TimestampSelector timestampSelector){
-        this.streamDefinition = streamDefinition;
-        this.timestampSelector = timestampSelector;
-    }
-
-    public FlattenEventMapper(StreamDefinition streamDefinition, String timestampFieldName){
-        this.streamDefinition = streamDefinition;
-        this.timestampSelector = tuple -> tuple.getLongByField(timestampFieldName);
-    }
-
-    public FlattenEventMapper(StreamDefinition streamDefinition){
-        this(streamDefinition,DEFAULT_TIMESTAMP_COLUMN_NAME);
-    }
-
-    @Override
-    public List<StreamEvent> map(Tuple tuple) throws Exception {
-        Long timestamp = 0L;
-        try {
-            timestamp = timestampSelector.apply(tuple);
-        } catch (Throwable fieldNotExistException){
-            if(streamDefinition.isTimeseries()) {
-                LOG.error("Stream (streamId = {}) is time series, but failed to detect timestamp, treating as {}", streamDefinition.getStreamId(), timestamp, fieldNotExistException);
-            } else{
-                /// Ignored for non-timeseries stream
-            }
-        }
-
-        StreamEvent streamEvent = new StreamEvent(streamDefinition.getStreamId(),
-                timestamp,
-                this.streamDefinition.getColumns().stream().map((column) -> {
-                    Object value = null;
-                    try {
-                        value = tuple.getValueByField(column.getName());
-                    }catch (IllegalArgumentException fieldNotExistException){
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug("Column '{}' of stream {} not exist in {}, treating as null", column.getName(), streamDefinition.getStreamId(), tuple, fieldNotExistException);
-                        }
-                    }
-                    return value;
-                }).toArray());
-        return Collections.singletonList(streamEvent);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
deleted file mode 100644
index 4f27423..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink.mapper;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-import java.io.Serializable;
-import java.util.List;
-
-@FunctionalInterface
-public interface StreamEventMapper extends Serializable{
-    /**
-     * @param tuple
-     * @return
-     * @throws Exception
-     */
-    List<StreamEvent> map(Tuple tuple) throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
deleted file mode 100644
index cac66e7..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink.mapper;
-
-import backtype.storm.tuple.Tuple;
-
-import java.io.Serializable;
-import java.util.function.Function;
-
-@FunctionalInterface
-public interface TimestampSelector extends Function<Tuple,Long>,Serializable{}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 37311eb..c451550 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -34,8 +34,6 @@ import java.util.List;
 
 public abstract class AbstractApplicationProvider<T extends Application> implements ApplicationProvider<T> {
     private final static Logger LOG = LoggerFactory.getLogger(AbstractApplicationProvider.class);
-    private final static String APPLICATIONS_SINK_TYPE_PROPS_KEY = "application.sink.type";
-    private final static String DEFAULT_APPLICATIONS_SINK_TYPE = KafkaStreamSink.class.getCanonicalName();
     private final ApplicationDesc applicationDesc;
 
     public AbstractApplicationProvider(){
@@ -74,17 +72,17 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
     @Override
     public void prepare(ApplicationProviderConfig providerConfig, Config envConfig) {
         this.applicationDesc.setJarPath(providerConfig.getJarPath());
-        String sinkClassName = envConfig.hasPath(APPLICATIONS_SINK_TYPE_PROPS_KEY) ?
-                envConfig.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_SINK_TYPE;
-        try {
-            Class<?> sinkClass = Class.forName(sinkClassName);
-            if(!StreamSink.class.isAssignableFrom(sinkClass)){
-                throw new IllegalStateException(sinkClassName+ "is not assignable from "+StreamSink.class.getCanonicalName());
-            }
-            applicationDesc.setSinkClass(sinkClass);
-        } catch (ClassNotFoundException e) {
-            throw new IllegalStateException(e.getMessage(),e.getCause());
-        }
+//        String sinkClassName = envConfig.hasPath(APPLICATIONS_SINK_TYPE_PROPS_KEY) ?
+//                envConfig.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_SINK_TYPE;
+//        try {
+//            Class<?> sinkClass = Class.forName(sinkClassName);
+//            if(!StreamSink.class.isAssignableFrom(sinkClass)){
+//                throw new IllegalStateException(sinkClassName+ "is not assignable from "+StreamSink.class.getCanonicalName());
+//            }
+//            applicationDesc.setSinkClass(sinkClass);
+//        } catch (ClassNotFoundException e) {
+//            throw new IllegalStateException(e.getMessage(),e.getCause());
+//        }
     }
 
     protected void setVersion(String version) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
deleted file mode 100644
index 14edbf8..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
-import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.app.tools.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.SiteEntity;
-import org.apache.eagle.metadata.resource.SiteResource;
-import org.junit.Assert;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class AppSimulatorImpl extends ApplicationSimulator {
-    private final Config config;
-    private final SiteResource siteResource;
-    private final ApplicationResource applicationResource;
-
-    @Inject
-    public AppSimulatorImpl(Config config, SiteResource siteResource,ApplicationResource applicationResource){
-        this.config = config;
-        this.siteResource = siteResource;
-        this.applicationResource = applicationResource;
-    }
-
-    @Override
-    public void submit(String appType) {
-        submit(appType, new HashMap<>());
-    }
-
-    @Override
-    public void submit(String appType, Map<String, Object> appConfig) {
-        SiteEntity siteEntity = getUniqueSite();
-        siteResource.createSite(siteEntity);
-        Assert.assertNotNull(siteEntity.getUuid());
-        // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL)).getData();
-        // Start application
-        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
-    }
-
-    private final static AtomicInteger incr = new AtomicInteger();
-
-    private SiteEntity getUniqueSite(){
-        // Create local site
-        SiteEntity siteEntity = new SiteEntity();
-        siteEntity.setSiteId("SIMULATED_SITE_"+incr.incrementAndGet());
-        siteEntity.setSiteName(siteEntity.getSiteId());
-        siteEntity.setDescription("Automatically generated unique simulation site "+siteEntity.getSiteId()+" (simulator: "+this+")");
-        return siteEntity;
-    }
-
-    @Override
-    public void submit(Class<? extends ApplicationProvider> appProviderClass) {
-        submit(appProviderClass, new HashMap<>());
-    }
-
-    @Override
-    public void submit(Class<? extends ApplicationProvider> appProviderClass, Map<String, Object> appConfig) {
-        try {
-            ApplicationProvider applicationProvider = appProviderClass.newInstance();
-            applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass),appProviderClass),config);
-            submit(applicationProvider.getApplicationDesc().getType(),appConfig);
-        } catch (InstantiationException | IllegalAccessException e) {
-            throw new IllegalStateException(e.getMessage(),e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
index 7882a9b..051b974 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
@@ -28,6 +28,6 @@ public class AppTestGuiceModule extends AbstractModule{
         install(new CommonGuiceModule());
         install(new ApplicationGuiceModule());
         install(new MemoryMetadataStore());
-        bind(ApplicationSimulator.class).to(AppSimulatorImpl.class).in(Singleton.class);
+        bind(ServerSimulator.class).to(ServerSimulatorImpl.class).in(Singleton.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
deleted file mode 100644
index f8114d2..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import com.google.inject.Guice;
-import com.google.inject.Module;
-import org.apache.eagle.app.spi.ApplicationProvider;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Application test simulator for developer to quickly run application without diving into application lifecycle
- */
-public abstract class ApplicationSimulator {
-    /**
-     *
-     * @param appType
-     */
-    public abstract void submit(String appType);
-
-    /**
-     *
-     * @param appType
-     * @param appConfig
-     */
-    public abstract void submit(String appType, Map<String,Object> appConfig);
-
-    /**
-     *
-     * @param appProviderClass
-     */
-    public abstract void submit(Class<? extends ApplicationProvider> appProviderClass);
-
-    /**
-     *
-     * @param appProviderClass
-     * @param appConfig
-     */
-    public abstract void submit(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
-
-    public static ApplicationSimulator getInstance(){
-        return Guice.createInjector(new AppTestGuiceModule()).getInstance(ApplicationSimulator.class);
-    }
-
-    /**
-     * @param modules additional modules
-     * @return ApplicationSimulator instance
-     */
-    public static ApplicationSimulator getInstance(Module ... modules){
-        List<Module> contextModules = Arrays.asList(modules);
-        contextModules.add(new AppTestGuiceModule());
-        return Guice.createInjector(contextModules).getInstance(ApplicationSimulator.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
new file mode 100644
index 0000000..a91af77
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Guice;
+import com.google.inject.Module;
+import org.apache.eagle.app.spi.ApplicationProvider;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Application test simulator for developer to quickly run application without diving into application lifecycle
+ */
+public abstract class ServerSimulator {
+    /**
+     *
+     * @param appType
+     */
+    public abstract void start(String appType);
+
+    /**
+     *
+     * @param appType
+     * @param appConfig
+     */
+    public abstract void start(String appType, Map<String,Object> appConfig);
+
+    /**
+     *
+     * @param appProviderClass
+     */
+    public abstract void start(Class<? extends ApplicationProvider> appProviderClass);
+
+    /**
+     *
+     * @param appProviderClass
+     * @param appConfig
+     */
+    public abstract void start(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
+
+    public static ServerSimulator getInstance(){
+        return Guice.createInjector(new AppTestGuiceModule()).getInstance(ServerSimulator.class);
+    }
+
+    /**
+     * @param modules additional modules
+     * @return ServerSimulator instance
+     */
+    public static ServerSimulator getInstance(Module ... modules){
+        List<Module> contextModules = Arrays.asList(modules);
+        contextModules.add(new AppTestGuiceModule());
+        return Guice.createInjector(contextModules).getInstance(ServerSimulator.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
new file mode 100644
index 0000000..2c686d9
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.config.ApplicationProviderConfig;
+import org.apache.eagle.app.resource.ApplicationResource;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.SiteEntity;
+import org.apache.eagle.metadata.resource.SiteResource;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ServerSimulatorImpl extends ServerSimulator {
+    private final Config config;
+    private final SiteResource siteResource;
+    private final ApplicationResource applicationResource;
+
+    @Inject
+    public ServerSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource){
+        this.config = config;
+        this.siteResource = siteResource;
+        this.applicationResource = applicationResource;
+    }
+
+    @Override
+    public void start(String appType) {
+        start(appType, new HashMap<>());
+    }
+
+    @Override
+    public void start(String appType, Map<String, Object> appConfig) {
+        SiteEntity siteEntity = getUniqueSite();
+        siteResource.createSite(siteEntity);
+        Assert.assertNotNull(siteEntity.getUuid());
+        // Install application
+        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL)).getData();
+        // Start application
+        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
+    }
+
+    private final static AtomicInteger incr = new AtomicInteger();
+
+    private SiteEntity getUniqueSite(){
+        // Create local site
+        SiteEntity siteEntity = new SiteEntity();
+        siteEntity.setSiteId("SIMULATED_SITE_"+incr.incrementAndGet());
+        siteEntity.setSiteName(siteEntity.getSiteId());
+        siteEntity.setDescription("Automatically generated unique simulation site "+siteEntity.getSiteId()+" (simulator: "+this+")");
+        return siteEntity;
+    }
+
+    @Override
+    public void start(Class<? extends ApplicationProvider> appProviderClass) {
+        start(appProviderClass, new HashMap<>());
+    }
+
+    @Override
+    public void start(Class<? extends ApplicationProvider> appProviderClass, Map<String, Object> appConfig) {
+        try {
+            ApplicationProvider applicationProvider = appProviderClass.newInstance();
+            applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass),appProviderClass),config);
+            start(applicationProvider.getApplicationDesc().getType(),appConfig);
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException(e.getMessage(),e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/tools/DynamicJarPathFinder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/tools/DynamicJarPathFinder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/tools/DynamicJarPathFinder.java
deleted file mode 100644
index fce72fe..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/tools/DynamicJarPathFinder.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.tools;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.nio.charset.Charset;
-
-/**
- * http://stackoverflow.com/questions/1983839/determine-which-jar-file-a-class-is-from
- * https://github.com/rzwitserloot/lombok.patcher/blob/master/src/lombok/patcher/inject/LiveInjector.java
- */
-public class DynamicJarPathFinder {
-    private final static Logger LOG = LoggerFactory.getLogger(DynamicJarPathFinder.class);
-    /**
-     * If the provided class has been loaded from a jar file that is on the local file system, will find the absolute path to that jar file.
-     *
-     * @param context The jar file that contained the class file that represents this class will be found. Specify {@code null} to let {@code LiveInjector}
-     *                find its own jar.
-     * @throws IllegalStateException If the specified class was loaded from a directory or in some other way (such as via HTTP, from a database, or some
-     *                               other custom classloading device).
-     */
-    public static String findPathJar(Class<?> context) throws IllegalStateException {
-        if (context == null) context = DynamicJarPathFinder.class;
-        String rawName = context.getName();
-        String classFileName;
-    /* rawName is something like package.name.ContainingClass$ClassName. We need to turn this into ContainingClass$ClassName.class. */ {
-            int idx = rawName.lastIndexOf('.');
-            classFileName = (idx == -1 ? rawName : rawName.substring(idx+1)) + ".class";
-        }
-
-        String uri = context.getResource(classFileName).toString();
-        if (uri.startsWith("file:")) {
-            throw new IllegalStateException("This class has been loaded from a directory and not from a jar file.");
-        }
-        if (!uri.startsWith("jar:file:")) {
-            int idx = uri.indexOf(':');
-            String protocol = idx == -1 ? "(unknown)" : uri.substring(0, idx);
-            throw new IllegalStateException("This class has been loaded remotely via the " + protocol +
-                    " protocol. Only loading from a jar on the local file system is supported.");
-        }
-
-        int idx = uri.indexOf('!');
-        //As far as I know, the if statement below can't ever trigger, so it's more of a sanity check thing.
-        if (idx == -1) throw new IllegalStateException("You appear to have loaded this class from a local jar file, but I can't make sense of the URL!");
-
-        try {
-            String fileName = URLDecoder.decode(uri.substring("jar:file:".length(), idx), Charset.defaultCharset().name());
-            return new File(fileName).getAbsolutePath();
-        } catch (UnsupportedEncodingException e) {
-            throw new InternalError("default charset doesn't exist. Your VM is borked.");
-        }
-    }
-
-    /**
-     * Similar to JarPathFinder, but not make sure the path must valid jar.
-     *
-     * @see DynamicJarPathFinder#findPathJar(Class)
-     * @return the class path contains the context class
-     */
-    public static String findPath(Class<?> context) throws IllegalStateException {
-        if (context == null) context = DynamicJarPathFinder.class;
-        String rawName = context.getName();
-        String classFileName;
-    /* rawName is something like package.name.ContainingClass$ClassName. We need to turn this into ContainingClass$ClassName.class. */ {
-            int idx = rawName.lastIndexOf('.');
-            classFileName = (idx == -1 ? rawName : rawName.substring(idx+1)) + ".class";
-        }
-
-        String uri = context.getResource(classFileName).toString();
-        if (uri.startsWith("file:")) {
-            LOG.warn("This class has been loaded from a directory and not from a jar file: {}",uri);
-            String fileName = null;
-            try {
-                fileName = URLDecoder.decode(uri.substring("file:".length(), uri.length()), Charset.defaultCharset().name());
-                return new File(fileName).getAbsolutePath();
-            } catch (UnsupportedEncodingException e) {
-                throw new InternalError("default charset doesn't exist. Your VM is borked.");
-            }
-        }
-
-        if (!uri.startsWith("jar:file:")) {
-            int idx = uri.indexOf(':');
-            String protocol = idx == -1 ? "(unknown)" : uri.substring(0, idx);
-            throw new IllegalStateException("This class has been loaded remotely via the " + protocol +
-                    " protocol. Only loading from a jar on the local file system is supported.");
-        }
-
-        int idx = uri.indexOf('!');
-        //As far as I know, the if statement below can't ever trigger, so it's more of a sanity check thing.
-        if (idx == -1) {
-            throw new IllegalStateException("You appear to have loaded this class from a local jar file, but I can't make sense of the URL!");
-        }
-
-        try {
-            String fileName = URLDecoder.decode(uri.substring("jar:file:".length(), idx), Charset.defaultCharset().name());
-            return new File(fileName).getAbsolutePath();
-        } catch (UnsupportedEncodingException e) {
-            throw new InternalError("default charset doesn't exist. Your VM is borked.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ApplicationConfigHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ApplicationConfigHelper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ApplicationConfigHelper.java
new file mode 100644
index 0000000..ac73da7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ApplicationConfigHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+
+import java.util.Map;
+
+public class ApplicationConfigHelper {
+    private static final ObjectMapper mapper = new ObjectMapper();
+    public static <Conf extends Configuration> Conf convertFrom(Map<String,Object> configMap, Class<Conf> confClass){
+        return mapper.convertValue(configMap,confClass);
+    }
+
+    /**
+     *  Map application configuration from environment
+     *
+     * @param config
+     * @return
+     */
+    public static Map<String,Object> unwrapFrom(Config config, String namespace){
+        if(config.hasPath(namespace)) {
+            return config.getConfig(namespace).root().unwrapped();
+        }else{
+            Map<String,Object> unwrappedConfig = config.root().unwrapped();
+            if(unwrappedConfig.containsKey(namespace)){
+                return (Map<String,Object>) unwrappedConfig.get(namespace);
+            }else {
+                throw new IllegalArgumentException("Failed to load app config as config key: '"+namespace+"' was not found in: "+config);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/DynamicJarPathFinder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/DynamicJarPathFinder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/DynamicJarPathFinder.java
new file mode 100644
index 0000000..a0a7d50
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/DynamicJarPathFinder.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.utils;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+
+/**
+ * http://stackoverflow.com/questions/1983839/determine-which-jar-file-a-class-is-from
+ * https://github.com/rzwitserloot/lombok.patcher/blob/master/src/lombok/patcher/inject/LiveInjector.java
+ */
+public class DynamicJarPathFinder {
+    private final static Logger LOG = LoggerFactory.getLogger(DynamicJarPathFinder.class);
+    /**
+     * If the provided class has been loaded from a jar file that is on the local file system, will find the absolute path to that jar file.
+     *
+     * @param context The jar file that contained the class file that represents this class will be found. Specify {@code null} to let {@code LiveInjector}
+     *                find its own jar.
+     * @throws IllegalStateException If the specified class was loaded from a directory or in some other way (such as via HTTP, from a database, or some
+     *                               other custom classloading device).
+     */
+    public static String findPathJar(Class<?> context) throws IllegalStateException {
+        if (context == null) context = DynamicJarPathFinder.class;
+        String rawName = context.getName();
+        String classFileName;
+    /* rawName is something like package.name.ContainingClass$ClassName. We need to turn this into ContainingClass$ClassName.class. */ {
+            int idx = rawName.lastIndexOf('.');
+            classFileName = (idx == -1 ? rawName : rawName.substring(idx+1)) + ".class";
+        }
+
+        String uri = context.getResource(classFileName).toString();
+        if (uri.startsWith("file:")) {
+            throw new IllegalStateException("This class has been loaded from a directory and not from a jar file.");
+        }
+        if (!uri.startsWith("jar:file:")) {
+            int idx = uri.indexOf(':');
+            String protocol = idx == -1 ? "(unknown)" : uri.substring(0, idx);
+            throw new IllegalStateException("This class has been loaded remotely via the " + protocol +
+                    " protocol. Only loading from a jar on the local file system is supported.");
+        }
+
+        int idx = uri.indexOf('!');
+        //As far as I know, the if statement below can't ever trigger, so it's more of a sanity check thing.
+        if (idx == -1) throw new IllegalStateException("You appear to have loaded this class from a local jar file, but I can't make sense of the URL!");
+
+        try {
+            String fileName = URLDecoder.decode(uri.substring("jar:file:".length(), idx), Charset.defaultCharset().name());
+            return new File(fileName).getAbsolutePath();
+        } catch (UnsupportedEncodingException e) {
+            throw new InternalError("default charset doesn't exist. Your VM is borked.");
+        }
+    }
+
+    /**
+     * Similar to JarPathFinder, but not make sure the path must valid jar.
+     *
+     * @see DynamicJarPathFinder#findPathJar(Class)
+     * @return the class path contains the context class
+     */
+    public static String findPath(Class<?> context) throws IllegalStateException {
+        if (context == null) context = DynamicJarPathFinder.class;
+        String rawName = context.getName();
+        String classFileName;
+    /* rawName is something like package.name.ContainingClass$ClassName. We need to turn this into ContainingClass$ClassName.class. */ {
+            int idx = rawName.lastIndexOf('.');
+            classFileName = (idx == -1 ? rawName : rawName.substring(idx+1)) + ".class";
+        }
+
+        String uri = context.getResource(classFileName).toString();
+        if (uri.startsWith("file:")) {
+            LOG.warn("This class has been loaded from a directory and not from a jar file: {}",uri);
+            String fileName = null;
+            try {
+                fileName = URLDecoder.decode(uri.substring("file:".length(), uri.length()), Charset.defaultCharset().name());
+                return new File(fileName).getAbsolutePath();
+            } catch (UnsupportedEncodingException e) {
+                throw new InternalError("default charset doesn't exist. Your VM is borked.");
+            }
+        }
+
+        if (!uri.startsWith("jar:file:")) {
+            int idx = uri.indexOf(':');
+            String protocol = idx == -1 ? "(unknown)" : uri.substring(0, idx);
+            throw new IllegalStateException("This class has been loaded remotely via the " + protocol +
+                    " protocol. Only loading from a jar on the local file system is supported.");
+        }
+
+        int idx = uri.indexOf('!');
+        //As far as I know, the if statement below can't ever trigger, so it's more of a sanity check thing.
+        if (idx == -1) {
+            throw new IllegalStateException("You appear to have loaded this class from a local jar file, but I can't make sense of the URL!");
+        }
+
+        try {
+            String fileName = URLDecoder.decode(uri.substring("jar:file:".length(), idx), Charset.defaultCharset().name());
+            return new File(fileName).getAbsolutePath();
+        } catch (UnsupportedEncodingException e) {
+            throw new InternalError("default charset doesn't exist. Your VM is borked.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
deleted file mode 100644
index aeefd77..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Map;
-
-@Ignore
-public class TestApplicationImpl extends AbstractApplication {
-    private final static Logger LOG = LoggerFactory.getLogger(TestApplicationImpl.class);
-    protected void buildApp(TopologyBuilder builder, ApplicationContext context) {
-        builder.setSpout("metric_spout", new RandomEventSpout(), 4);
-        builder.setBolt("sink_1",context.getFlattenStreamSink("TEST_STREAM_1")).fieldsGrouping("metric_spout",new Fields("metric"));
-        builder.setBolt("sink_2",context.getFlattenStreamSink("TEST_STREAM_2")).fieldsGrouping("metric_spout",new Fields("metric"));
-    }
-
-    private class RandomEventSpout extends BaseRichSpout {
-        private SpoutOutputCollector _collector;
-        @Override
-        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-            _collector = spoutOutputCollector;
-        }
-
-        @Override
-        public void nextTuple() {
-            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
-            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
-        }
-    }
-
-    public static class Provider extends AbstractApplicationProvider<TestApplicationImpl> {
-        public Provider(){
-            super("TestApplicationMetadata.xml");
-        }
-
-        @Override
-        public TestApplicationImpl getApplication() {
-            return new TestApplicationImpl();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
index de3338e..d318abb 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
@@ -1,83 +1,77 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import com.google.inject.Inject;
-import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.test.ApplicationSimulator;
-import org.apache.eagle.app.test.AppUnitTestRunner;
-import org.apache.eagle.metadata.model.ApplicationDesc;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.SiteEntity;
-import org.apache.eagle.metadata.resource.SiteResource;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import java.util.Collection;
-
-@RunWith(AppUnitTestRunner.class)
-public class TestApplicationTestSuite {
-    @Inject private SiteResource siteResource;
-    @Inject private ApplicationResource applicationResource;
-    @Inject private ApplicationSimulator simulator;
-
-    @Test
-    public void testApplicationProviderLoading(){
-        Collection<ApplicationDesc> applicationDescs = applicationResource.getApplicationDescs().getData();
-        Assert.assertNotNull(applicationDescs);
-        Assert.assertEquals(1,applicationDescs.size());
-    }
-
-    @Test
-    public void testApplicationLifecycle() throws InterruptedException {
-        // Create local site
-        SiteEntity siteEntity = new SiteEntity();
-        siteEntity.setSiteId("test_site");
-        siteEntity.setSiteName("Test Site");
-        siteEntity.setDescription("Test Site for ExampleApplicationTest");
-        siteResource.createSite(siteEntity);
-        Assert.assertNotNull(siteEntity.getUuid());
-
-        // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","TEST_APPLICATION", ApplicationEntity.Mode.LOCAL)).getData();
-        // Start application
-        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
-        // Stop application
-        applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
-        // Uninstall application
-        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
-        try {
-            applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
-            Assert.fail("Application instance (UUID: "+applicationEntity.getUuid()+") should have been uninstalled");
-        } catch (Exception ex){
-            // Expected exception
-        }
-    }
-
-    @Test
-    public void testApplicationQuickRunWithAppType(){
-        simulator.submit("TEST_APPLICATION");
-    }
-
-    @Test
-    public void testApplicationQuickRunWithAppProvider(){
-        simulator.submit(TestApplicationImpl.Provider.class);
-    }
-}
\ No newline at end of file
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// * <p/>
+// * http://www.apache.org/licenses/LICENSE-2.0
+// * <p/>
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.eagle.app;
+//
+//import com.google.inject.Inject;
+//import org.apache.eagle.app.resource.ApplicationResource;
+//import org.apache.eagle.app.service.ApplicationOperations;
+//import org.apache.eagle.app.test.ServerSimulator;
+//import org.apache.eagle.app.test.AppUnitTestRunner;
+//import org.apache.eagle.metadata.model.ApplicationDesc;
+//import org.apache.eagle.metadata.model.ApplicationEntity;
+//import org.apache.eagle.metadata.model.SiteEntity;
+//import org.apache.eagle.metadata.resource.SiteResource;
+//import org.junit.Assert;
+//import org.junit.Test;
+//import org.junit.runner.RunWith;
+//
+//import java.util.Collection;
+//
+//@RunWith(AppUnitTestRunner.class)
+//public class TestApplicationTestSuite {
+//    @Inject private SiteResource siteResource;
+//    @Inject private ApplicationResource applicationResource;
+//    @Inject private ServerSimulator simulator;
+//
+//    @Test
+//    public void testApplicationProviderLoading(){
+//        Collection<ApplicationDesc> applicationDescs = applicationResource.getApplicationDescs().getData();
+//        Assert.assertNotNull(applicationDescs);
+//        Assert.assertEquals(1,applicationDescs.size());
+//    }
+//
+//    @Test
+//    public void testApplicationLifecycle() throws InterruptedException {
+//        // Create local site
+//        SiteEntity siteEntity = new SiteEntity();
+//        siteEntity.setSiteId("test_site");
+//        siteEntity.setSiteName("Test Site");
+//        siteEntity.setDescription("Test Site for ExampleApplicationTest");
+//        siteResource.createSite(siteEntity);
+//        Assert.assertNotNull(siteEntity.getUuid());
+//
+//        // Install application
+//        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","TEST_APPLICATION", ApplicationEntity.Mode.LOCAL)).getData();
+//        // Start application
+//        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
+//        // Stop application
+//        applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
+//        // Uninstall application
+//        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
+//        try {
+//            applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
+//            Assert.fail("Application instance (UUID: "+applicationEntity.getUuid()+") should have been uninstalled");
+//        } catch (Exception ex){
+//            // Expected exception
+//        }
+//    }
+//
+//    @Test
+//    public void testApplicationQuickRunWithAppType(){
+//        simulator.start("TEST_APPLICATION");
+//    }
+//}
\ No newline at end of file


Mime
View raw message