eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [3/3] incubator-eagle git commit: [EAGLE-402] Refactor Application Framework to be better managed or deployed standalone
Date Wed, 03 Aug 2016 15:25:27 GMT
[EAGLE-402] Refactor Application Framework to be better managed or deployed standalone

https://issues.apache.org/jira/browse/EAGLE-402

Author: Hao Chen <hao@apache.org>

Closes #296 from haoch/EAGLE-402.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4ad1a418
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4ad1a418
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4ad1a418

Branch: refs/heads/develop
Commit: 4ad1a4182ae6525e338b9f17cc60d1ff39ff2ebe
Parents: 30db8ef
Author: Hao Chen <hao@apache.org>
Authored: Wed Aug 3 23:24:37 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Wed Aug 3 23:24:37 2016 +0800

----------------------------------------------------------------------
 eagle-core/eagle-app/eagle-app-base/pom.xml     |   5 -
 .../apache/eagle/app/AbstractApplication.java   | 210 ++++++----------
 .../java/org/apache/eagle/app/Application.java  |  62 ++++-
 .../apache/eagle/app/ApplicationContext.java    | 238 -------------------
 .../apache/eagle/app/ApplicationLifecycle.java  |  39 +++
 .../eagle/app/ApplicationLifecycleListener.java |  39 ---
 .../org/apache/eagle/app/ApplicationTool.java   |  60 +++++
 .../org/apache/eagle/app/Configuration.java     |  60 +++++
 .../org/apache/eagle/app/StormApplication.java  |  27 +++
 .../app/environment/AbstractEnvironment.java    |  71 ++++++
 .../eagle/app/environment/Environment.java      |  30 +++
 .../eagle/app/environment/ExecutionRuntime.java |  53 +++++
 .../environment/ExecutionRuntimeManager.java    |  92 +++++++
 .../environment/ExecutionRuntimeProvider.java   |  21 ++
 .../app/environment/impl/SparkEnvironment.java  |  29 +++
 .../environment/impl/SparkExecutionRuntime.java |  58 +++++
 .../app/environment/impl/StormEnvironment.java  |  42 ++++
 .../environment/impl/StormExecutionRuntime.java | 162 +++++++++++++
 .../java/org/apache/eagle/app/package-info.java |   4 +-
 .../eagle/app/service/ApplicationContext.java   | 103 ++++++++
 .../impl/ApplicationManagementServiceImpl.java  |  27 ++-
 .../impl/ApplicationProviderSPILoader.java      |   2 +-
 .../eagle/app/sink/AbstractStreamSink.java      |  75 ------
 .../eagle/app/sink/DefaultStreamSinkConfig.java |  42 ++++
 .../eagle/app/sink/DefaultStreamSinkDesc.java   |  42 ----
 .../eagle/app/sink/FlattenEventMapper.java      |  60 +++++
 .../apache/eagle/app/sink/KafkaStreamSink.java  |  47 ++--
 .../eagle/app/sink/KafkaStreamSinkConfig.java   |  46 ++++
 .../eagle/app/sink/KafkaStreamSinkDesc.java     |  46 ----
 .../eagle/app/sink/LoggingStreamSink.java       |  26 +-
 .../apache/eagle/app/sink/StormStreamSink.java  |  88 +++++++
 .../eagle/app/sink/StreamEventMapper.java       |  33 +++
 .../org/apache/eagle/app/sink/StreamSink.java   |  30 +--
 .../eagle/app/sink/StreamSinkProvider.java      |  46 ++++
 .../app/sink/mapper/DirectEventMapper.java      |  19 --
 .../mapper/FieldIndexDirectEventMapper.java     |  40 ----
 .../sink/mapper/FieldNameDirectEventMapper.java |  40 ----
 .../app/sink/mapper/FlattenEventMapper.java     |  78 ------
 .../app/sink/mapper/StreamEventMapper.java      |  33 ---
 .../app/sink/mapper/TimestampSelector.java      |  25 --
 .../app/spi/AbstractApplicationProvider.java    |  24 +-
 .../apache/eagle/app/test/AppSimulatorImpl.java |  89 -------
 .../eagle/app/test/AppTestGuiceModule.java      |   2 +-
 .../eagle/app/test/ApplicationSimulator.java    |  70 ------
 .../apache/eagle/app/test/ServerSimulator.java  |  70 ++++++
 .../eagle/app/test/ServerSimulatorImpl.java     |  89 +++++++
 .../eagle/app/tools/DynamicJarPathFinder.java   | 121 ----------
 .../app/utils/ApplicationConfigHelper.java      |  49 ++++
 .../eagle/app/utils/DynamicJarPathFinder.java   | 121 ++++++++++
 .../apache/eagle/app/TestApplicationImpl.java   |  72 ------
 .../eagle/app/TestApplicationTestSuite.java     | 160 ++++++-------
 .../eagle/app/storm/MockStormApplication.java   |  97 ++++++++
 .../app/storm/MockStormApplicationTest.java     |  76 ++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |   2 +-
 .../test/resources/TestApplicationMetadata.xml  |   2 +-
 .../src/test/resources/application.conf         |   7 +
 .../src/test/resources/providers.xml            |   2 +-
 .../config/AbstractConfigOptionParser.java      |  71 ++++++
 .../eagle/common/config/ConfigOptionParser.java |  67 ++++++
 .../eagle-metadata/eagle-metadata-base/pom.xml  |   1 +
 .../eagle/metadata/model/ApplicationDesc.java   |  16 +-
 .../eagle/metadata/model/ApplicationEntity.java |   7 +-
 .../apache/eagle/metadata/model/StreamDesc.java |   6 +-
 .../eagle/metadata/model/StreamSinkConfig.java  |  25 ++
 .../eagle/metadata/model/StreamSinkDesc.java    |  25 --
 .../eagle-metadata/eagle-metadata-jdbc/pom.xml  |   1 +
 eagle-examples/eagle-app-example/pom.xml        |   5 +
 .../eagle/app/example/ExampleApplication.java   |  56 -----
 .../app/example/ExampleApplicationProvider.java |   6 +-
 .../example/ExampleApplicationProvider2.java    |   8 +-
 .../app/example/ExampleStormApplication.java    |  60 +++++
 .../eagle/app/example/ExampleStormConfig.java   |  20 ++
 .../META-INF/apps/example/metadata.xml          |   2 +-
 .../example/ExampleApplicationProviderTest.java |  96 ++++++++
 .../app/example/ExampleApplicationTest.java     | 145 ++++-------
 .../src/test/java/resources/application.conf    |  58 -----
 .../src/test/resources/application.conf         |  62 +++++
 .../lib/EAGLE/package/patches/app.js            |   4 +-
 eagle-jpm/eagle-jpm-app/pom.xml                 |   5 +
 .../apache/eagle/app/jpm/JPMApplication.java    |  16 +-
 .../apache/eagle/app/jpm/JPMConfiguration.java  |  23 ++
 .../resources/META-INF/apps/jpm/metadata.xml    |   2 +-
 .../eagle/app/jpm/JPMApplicationTest.java       |   1 -
 .../org/apache/eagle/server/ServerMain.java     |  21 +-
 84 files changed, 2480 insertions(+), 1632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml b/eagle-core/eagle-app/eagle-app-base/pom.xml
index 500db86..d89b78c 100644
--- a/eagle-core/eagle-app/eagle-app-base/pom.xml
+++ b/eagle-core/eagle-app/eagle-app-base/pom.xml
@@ -93,11 +93,6 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-metadata-base</artifactId>
-            <version>0.5.0-incubating-SNAPSHOT</version>
-        </dependency>
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>compile</scope>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
index 5001a80..dca2e3d 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
@@ -1,143 +1,69 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.*;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.NimbusClient;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-import java.io.Serializable;
-
-public abstract class AbstractApplication implements Application,Serializable {
-    private final static Logger LOG = LoggerFactory.getLogger(AbstractApplication.class);
-    private static LocalCluster _localCluster;
-
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread(){
-            @Override
-            public void run() {
-                if(_localCluster != null) {
-                    LOG.info("Shutting down local storm cluster instance");
-                    _localCluster.shutdown();
-                }
-            }
-        });
-    }
-
-    private static LocalCluster getLocalCluster(){
-        if(_localCluster == null){
-            _localCluster = new LocalCluster();
-        }
-
-        return _localCluster;
-    }
-
-    @Override
-    public void start(ApplicationContext context){
-        ApplicationEntity appEntity = context.getAppEntity();
-        String topologyName = context.getAppEntity().getAppId();
-
-        TopologyBuilder builder = new TopologyBuilder();
-        buildApp(builder,context);
-        StormTopology topology = builder.createTopology();
-        Config conf = getClusterStormConfig(context);
-        if(appEntity.getMode() == ApplicationEntity.Mode.CLUSTER){
-            String jarFile = context.getAppEntity().getDescriptor().getJarPath();
-            synchronized (AbstractApplication.class) {
-                System.setProperty("storm.jar", jarFile);
-                LOG.info("Submitting as cluster mode");
-                try {
-                    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
-                } catch (AlreadyAliveException | InvalidTopologyException e) {
-                    LOG.error(e.getMessage(), e);
-                    throw new RuntimeException(e.getMessage(),e);
-                } finally {
-                    System.clearProperty("storm.jar");
-                }
-            }
-        }else{
-            LOG.info("Submitting as local mode");
-            getLocalCluster().submitTopology(topologyName, conf, topology);
-        }
-    }
-
-    private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
-    private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
-    private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
-    private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
-
-    private static Config getClusterStormConfig(ApplicationContext context){
-        Config conf = new Config();
-        conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
-        conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
-        conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
-        conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
-        conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
-        conf.put(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
-        String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
-
-        if(context.getEnvConfig().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
-            nimbusHost = context.getEnvConfig().getString(STORM_NIMBUS_HOST_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
-        } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
-        }
-        Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
-        if(context.getEnvConfig().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
-            nimbusThriftPort = context.getEnvConfig().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
-        } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
-        }
-        conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
-        return conf;
-    }
-
-    protected abstract void buildApp(TopologyBuilder builder, ApplicationContext context);
-
-    @Override
-    public void stop(ApplicationContext context) {
-        ApplicationEntity appEntity = context.getAppEntity();
-        String appId = appEntity.getAppId();
-        if(appEntity.getMode() == ApplicationEntity.Mode.CLUSTER){
-            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getClusterStormConfig(context)).getClient();
-            try {
-                stormClient.killTopology(appId);
-            } catch (NotAliveException | TException e) {
-                LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
-            }
-        } else {
-            getLocalCluster().killTopology(appId);
-        }
-    }
-
-    @Override
-    public void status(ApplicationContext context) {
-        // TODO: Not implemented yet!
-        throw new RuntimeException("TODO: Not implemented yet!");
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.Environment;
+import org.apache.eagle.app.environment.ExecutionRuntimeManager;
+import org.apache.eagle.app.utils.ApplicationConfigHelper;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.Map;
+
+abstract class AbstractApplication<Conf extends Configuration,Env extends Environment,Proc> implements Application<Conf,Env,Proc>, ApplicationTool<Conf> {
+    private Class<Conf> parametrizedConfigClass;
+
+    @Override
+    public Proc execute(Map<String, Object> config, Env env) {
+        return execute(ApplicationConfigHelper.convertFrom(config, getConfigType()),env);
+    }
+
+    /**
+     *  Map application configuration from environment
+     *
+     * @param config
+     * @return
+     */
+    private Conf loadAppConfigFromEnv(Config config){
+        return ApplicationConfigHelper.convertFrom(ApplicationConfigHelper.unwrapFrom(config,getClass().getCanonicalName()), getConfigType());
+    }
+
+    @Override
+    public void run(Config config) {
+        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,loadAppConfigFromEnv(config));
+    }
+
+    @Override
+    public void run(Configuration conf, Config config) {
+        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(), config).start(this,conf);
+    }
+
+    @Override
+    public Proc execute(Env environment) {
+        return execute(loadAppConfigFromEnv(environment.config()),environment);
+    }
+
+    /**
+     * @return Config class from Generic Type
+     */
+    public Class<Conf> getConfigType(){
+        if (parametrizedConfigClass == null) {
+            this.parametrizedConfigClass = (Class<Conf>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+        }
+        return parametrizedConfigClass;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
index 4d35e08..699b13e 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,25 +16,69 @@
  */
 package org.apache.eagle.app;
 
+import org.apache.eagle.app.environment.Environment;
+
+import java.io.Serializable;
+import java.util.Map;
+
 /**
  * Application Execution Interface
+ *
+ * <h1>Design Principle</h1>
+ * <ul>
+ *  <li>Easy to develop and extend </li>
+ *  <li>Easy to test and run locally</li>
+ *  <li>Easy to manage lifecycle through framework</li>
+ * </ul>
+ *
+ * @param <Proc>
+ * @param <Conf>
+ * @param <Env>
  */
-public interface Application {
+public interface Application <
+    Conf extends Configuration,     //  Application Configuration
+    Env extends Environment,        // Application Environment
+    Proc                            // Application Process
+> extends Serializable {
     /**
+     * Execute with type-safe configuration
+     *
+     * Developer-oriented interface
      *
-     * @param context
+     * @param config application configuration
+     * @param environment execution environment
+     * @return execution process
      */
-    void start(ApplicationContext context);
+    Proc execute(Conf config, Env environment);
 
     /**
+     * Execute with raw map-based configuration
      *
-     * @param context
+     * Management service oriented interface
+     *
+     * @param config application configuration
+     * @param environment  execution environment
+     * @return execution process
+     */
+    Proc execute(Map<String,Object> config, Env environment);
+
+    /**
+     * Execute with environment based configuration
+     *
+     * Light-weight Runner (dry-run/test purpose) oriented interface
+     *
+     * @param environment  execution environment
+     * @return execution process
+     */
+    Proc execute(Env environment);
+
+    /**
+     * @return application configuration type (POJO class)
      */
-    void stop(ApplicationContext context);
+    Class<Conf> getConfigType();
 
     /**
-     * 
-     * @param context
+     * @return application environment type
      */
-    void status(ApplicationContext context);
+    Class<? extends Env> getEnvironmentType();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
deleted file mode 100644
index cf8646c..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.app.sink.StreamSink;
-import org.apache.eagle.app.sink.mapper.*;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.StreamDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.*;
-
-/**
- * Application Context Interface: org.apache.eagle.app.ApplicationContext
- *
- * <ul>
- *     <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li>
- *     <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li>
- *     <li>Application Lifecycle Listener (Installation): org.apache.eagle.app.ApplicationLifecycleListener</li>
- * </ul>
- */
-public class ApplicationContext implements Serializable, ApplicationLifecycleListener{
-    private final Config envConfig;
-    private final ApplicationEntity appEntity;
-    private final static Logger LOG = LoggerFactory.getLogger(ApplicationContext.class);
-    private final Map<String, StreamDefinition> streamDefinitionMap;
-    private final Map<String,StreamDesc> streamDescMap;
-    private final List<ApplicationLifecycleListener> applicationLifecycleListeners;
-    private final ApplicationProvider appProvider;
-
-    /**
-     * @param appEntity ApplicationEntity
-     * @param appProvider ApplicationProvider
-     * @param envConfig Config
-     */
-    public ApplicationContext(ApplicationEntity appEntity, ApplicationProvider appProvider, Config envConfig){
-        this.appEntity = appEntity;
-        this.envConfig = envConfig;
-        this.appProvider = appProvider;
-        this.streamDefinitionMap = new HashMap<>();
-        this.streamDescMap = new HashMap<>();
-        this.applicationLifecycleListeners = new LinkedList<>();
-        doInit();
-    }
-
-    public void registerListener(ApplicationLifecycleListener listener){
-        applicationLifecycleListeners.add(listener);
-    }
-
-    public List<ApplicationLifecycleListener> getListeners(){
-        return applicationLifecycleListeners;
-    }
-
-    private void doInit() {
-        Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
-        List<StreamDefinition> outputStreams = appEntity.getDescriptor().getStreams();
-        if(null != outputStreams){
-            outputStreams.forEach((stream) -> {
-                try {
-                    StreamDesc streamDesc = new StreamDesc();
-                    StreamSink streamSink = (StreamSink) sinkClass.newInstance();
-                    streamDesc.setSink(streamSink.init(stream,this));
-                    streamDesc.setSchema(stream);
-                    streamDesc.setStreamId(stream.getStreamId());
-                    streamDescMap.put(streamDesc.getStreamId(),streamDesc);
-                    streamDefinitionMap.put(streamDesc.getStreamId(),stream);
-                    registerListener(streamSink);
-                } catch (InstantiationException | IllegalAccessException e) {
-                    LOG.error("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application: {}",this.getAppEntity());
-                    throw new RuntimeException("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application:"+this.getAppEntity(),e);
-                }
-            });
-        }
-    }
-
-    public ApplicationEntity getAppEntity() {
-        return appEntity;
-    }
-
-    public Config getEnvConfig() {
-        return envConfig;
-    }
-
-    /**
-     * Make sure streamId is declared in Application Providers
-     *
-     * @param streamId
-     * @return
-     */
-    public StreamSink getFlattenStreamSink(String streamId, StreamEventMapper mapper){
-        checkStreamExists(streamId);
-        Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
-        try {
-            StreamSink abstractStreamSink = (StreamSink) sinkClass.newInstance();
-            abstractStreamSink.setEventMapper(mapper);
-            abstractStreamSink.init(streamDefinitionMap.get(streamId),this);
-            return abstractStreamSink;
-        } catch (InstantiationException | IllegalAccessException e) {
-            LOG.error("Failed to instantiate "+sinkClass,e);
-            throw new IllegalStateException("Failed to instantiate "+sinkClass,e);
-        }
-    }
-
-    /**
-     * Make sure streamId is declared in Application Providers
-     *
-     * @param streamId
-     * @return
-     */
-    public StreamSink getDirectStreamSink(String streamId, String ... fieldNames){
-        return getFlattenStreamSink(streamId,new FieldNameDirectEventMapper(fieldNames));
-    }
-
-    /**
-     * Make sure streamId is declared in Application Providers
-     *
-     * @param streamId
-     * @return
-     */
-    public StreamSink getDirectStreamSink(String streamId, int ... fieldIndexs){
-        return getFlattenStreamSink(streamId,new FieldIndexDirectEventMapper(fieldIndexs));
-    }
-
-    /**
-     * Make sure streamId is declared in Application Providers
-     *
-     * @param streamId
-     * @return
-     */
-    public StreamSink getFlattenStreamSink(String streamId, TimestampSelector timestampSelector){
-        checkStreamExists(streamId);
-        return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId),timestampSelector));
-    }
-
-    /**
-     * Make sure streamId is declared in Application Providers
-     *
-     * @param streamId
-     * @return
-     */
-    public StreamSink getFlattenStreamSink(String streamId, String timestampField){
-        checkStreamExists(streamId);
-        return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId),timestampField));
-    }
-
-    /**
-     * Make sure streamId is declared in Application Providers
-     *
-     * @param streamId
-     * @return
-     */
-    public StreamSink getFlattenStreamSink(String streamId){
-        checkStreamExists(streamId);
-        return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId)));
-    }
-
-    private void checkStreamExists(String streamId){
-        if(! streamDefinitionMap.containsKey(streamId)){
-            LOG.error("Stream [streamId = "+streamId+"] is not defined in "
-                    + appEntity.getDescriptor().getProviderClass().getCanonicalName());
-            throw new IllegalStateException("Stream [streamId = "+streamId+"] is not defined in "
-                    + appEntity.getDescriptor().getProviderClass().getCanonicalName());
-        }
-    }
-
-    public Collection<StreamDesc> getStreamSinkDescs(){
-        return streamDescMap.values();
-    }
-
-    @Override
-    public void onAppInstall() {
-        getListeners().forEach((listener)->{
-            try {
-                listener.onAppInstall();
-            }catch (Throwable throwable){
-                LOG.error("Failed to invoked onAppInstall of listener {}",listener.toString(),throwable);
-                throw throwable;
-            }
-        });
-    }
-
-    @Override
-    public void onAppUninstall() {
-        getListeners().forEach((listener)->{
-            try {
-                listener.onAppUninstall();
-            }catch (Throwable throwable){
-                LOG.error("Failed to invoked onAppUninstall of listener {}",listener.toString(),throwable);
-                throw throwable;
-            }
-        });
-    }
-
-    @Override
-    public void onAppStart() {
-        getListeners().forEach((listener)->{
-            try {
-                listener.onAppStart();
-            }catch (Throwable throwable){
-                LOG.error("Failed to invoked onAppStart of listener {}",listener.toString(),throwable);
-                throw throwable;
-            }
-        });
-        appProvider.getApplication().start(this);
-    }
-
-    @Override
-    public void onAppStop() {
-        appProvider.getApplication().stop(this);
-        getListeners().forEach((listener)->{
-            try {
-                listener.onAppStop();
-            }catch (Throwable throwable){
-                LOG.error("Failed to invoked onAppStop of listener {}",listener.toString(),throwable);
-                throw throwable;
-            }
-        });
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java
new file mode 100644
index 0000000..9e3d992
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+public interface ApplicationLifecycle {
+    /**
+     * on application installed
+     */
+    default void onInstall(){}
+
+    /**
+     * on application uninstalled
+     */
+    default void onUninstall(){}
+
+    /**
+     *
+     */
+    default void onStart(){}
+
+    /**
+     *
+     */
+    default void onStop(){}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
deleted file mode 100644
index 6ced960..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-public interface ApplicationLifecycleListener {
-    /**
-     * on application installed
-     */
-    void onAppInstall();
-
-    /**
-     * on application uninstalled
-     */
-    void onAppUninstall();
-
-    /**
-     *
-     */
-    void onAppStart();
-
-    /**
-     *
-     */
-    void onAppStop();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationTool.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationTool.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationTool.java
new file mode 100644
index 0000000..c0f8723
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationTool.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.cli.ParseException;
+import org.apache.eagle.common.config.ConfigOptionParser;
+
+public interface ApplicationTool<Conf extends Configuration> {
+    /**
+     * Run application through CLI
+     *
+     * @param args application arguments
+     */
+    default void run(String[] args){
+        try {
+            run(new ConfigOptionParser().load(args));
+        } catch (ParseException e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    /**
+     *
+     * @param config
+     */
+    void run(Config config);
+
+    /**
+     * @param appConf
+     */
+    void run(Conf appConf, Config envConfig);
+
+    /**
+     * @param appConf
+     */
+    default void run(Conf appConf){
+        run(appConf, ConfigFactory.load());
+    }
+
+    default void run(){
+        run(ConfigFactory.load());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Configuration.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Configuration.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Configuration.java
new file mode 100644
index 0000000..9280158
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Configuration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import org.apache.eagle.metadata.model.ApplicationEntity;
+
+import java.io.Serializable;
+
+public class Configuration implements Serializable {
+    private ApplicationEntity.Mode mode;
+    private String siteId;
+    private String appId;
+    private String jarPath;
+
+    public ApplicationEntity.Mode getMode() {
+        return mode;
+    }
+
+    public void setMode(ApplicationEntity.Mode mode) {
+        this.mode = mode;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public void setAppId(String appId) {
+        this.appId = appId;
+    }
+
+    public String getJarPath() {
+        return jarPath;
+    }
+
+    public void setJarPath(String jarPath) {
+        this.jarPath = jarPath;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/StormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/StormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/StormApplication.java
new file mode 100644
index 0000000..5ae01fa
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/StormApplication.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import backtype.storm.generated.StormTopology;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
+public abstract class StormApplication<AppConf extends Configuration> extends AbstractApplication<AppConf,StormEnvironment,StormTopology>{
+    @Override
+    public Class<? extends StormEnvironment> getEnvironmentType() {
+        return StormEnvironment.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
new file mode 100644
index 0000000..6f7fd97
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.app.sink.KafkaStreamSink;
+import org.apache.eagle.app.sink.StreamSinkProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractEnvironment implements Environment {
+    private final Config config;
+    private final StreamSinkProvider sinkProvider;
+    private final static String APPLICATIONS_SINK_TYPE_PROPS_KEY = "application.sink.provider";
+    private final static String DEFAULT_APPLICATIONS_SINK_TYPE = KafkaStreamSink.Provider.class.getName();
+    private final static Logger LOGGER = LoggerFactory.getLogger(AbstractEnvironment.class);
+
+    public AbstractEnvironment(Config config){
+        this.config = config;
+        this.sinkProvider = loadStreamSinkProvider();
+    }
+
+    private StreamSinkProvider loadStreamSinkProvider(){
+        String sinkProviderClassName = config.hasPath(APPLICATIONS_SINK_TYPE_PROPS_KEY) ?
+                config.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_SINK_TYPE;
+        try {
+            Class<?> sinkProviderClass = Class.forName(sinkProviderClassName);
+            if(!StreamSinkProvider.class.isAssignableFrom(sinkProviderClass)){
+                throw new IllegalStateException(sinkProviderClassName+ "is not assignable from "+StreamSinkProvider.class.getCanonicalName());
+            }
+            StreamSinkProvider instance =  (StreamSinkProvider) sinkProviderClass.newInstance();
+            LOGGER.info("Loaded {}",instance);
+            return instance;
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+            LOGGER.error(e.getMessage(),e);
+            throw new IllegalStateException(e.getMessage(),e.getCause());
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(this.getClass())
+                .append(this.config()).build();
+    }
+
+    @Override
+    public StreamSinkProvider streamSink() {
+        return sinkProvider;
+    }
+
+    @Override
+    public Config config() {
+        return config;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
new file mode 100644
index 0000000..56ce14a
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.sink.StreamSinkProvider;
+
+import java.io.Serializable;
+
+/**
+ * Execution Environment Context
+ */
+public interface Environment extends Serializable{
+    Config config();
+    StreamSinkProvider streamSink();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
new file mode 100644
index 0000000..7605d92
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment;
+
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+
+/**
+ * Execution Runtime Adapter
+ */
+public interface ExecutionRuntime<Env extends Environment, Proc> {
+    /**
+     * @param environment
+     */
+    void prepare(Env environment);
+
+    Env environment();
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void start(Application<Conf,Env, Proc> executor, Conf config);
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void stop(Application<Conf,Env, Proc> executor, Conf config);
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void status(Application<Conf,Env, Proc> executor, Conf config);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java
new file mode 100644
index 0000000..9804d48
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.SparkEnvironment;
+import org.apache.eagle.app.environment.impl.SparkExecutionRuntime;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.environment.impl.StormExecutionRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Manage execution runtime corresponding to Environment
+ *
+ * @see Environment
+ * @see ExecutionRuntime
+ */
+public class ExecutionRuntimeManager {
+    private final static ExecutionRuntimeManager INSTANCE = new ExecutionRuntimeManager();
+    private final static Logger LOGGER = LoggerFactory.getLogger(ExecutionRuntimeManager.class);
+
+    static {
+        getInstance().register(StormEnvironment.class,new StormExecutionRuntime.Provider());
+        getInstance().register(SparkEnvironment.class,new SparkExecutionRuntime.Provider());
+    }
+
+    private final Map<Class<? extends Environment>, ExecutionRuntimeProvider> executionRuntimeProviders;
+    private final Map<Environment, ExecutionRuntime> executionRuntimeCache;
+
+    private ExecutionRuntimeManager(){
+        executionRuntimeProviders = new HashMap<>();
+        executionRuntimeCache = new HashMap<>();
+    }
+
+    public static ExecutionRuntimeManager getInstance(){
+        return INSTANCE;
+    }
+
+    public <E extends Environment,P> ExecutionRuntime getRuntime(E environment) {
+        Preconditions.checkNotNull(environment,"Failed to create execution runtime as environment is null");
+        if(executionRuntimeCache.containsKey(environment))
+            return executionRuntimeCache.get(environment);
+
+        if(executionRuntimeProviders.containsKey(environment.getClass())){
+            ExecutionRuntime<E,P> runtime = ((ExecutionRuntimeProvider<E,P>)executionRuntimeProviders.get(environment.getClass())).get();
+            runtime.prepare(environment);
+            executionRuntimeCache.put(environment,runtime);
+            LOGGER.info("Created new execution runtime {} for environment: {}",runtime,environment);
+            return runtime;
+        } else {
+            LOGGER.error("No matched execution runtime found for environment: "+environment);
+            throw new IllegalStateException("No matched execution runtime found for environment: "+environment);
+        }
+    }
+
+    public <E extends Environment> ExecutionRuntime getRuntime(Class<E> environmentClass, Config config) {
+        try {
+            E environment = environmentClass.getConstructor(Config.class).newInstance(config);
+            return getRuntime(environment);
+        } catch (InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
+            LOGGER.error("Failed to create environment instance of type: "+environmentClass,e);
+            throw new RuntimeException("Failed to create environment instance of type: "+environmentClass,e);
+        }
+    }
+
+    public void register(Class<? extends Environment> appSuperClass,ExecutionRuntimeProvider executionRuntimeProvider){
+        if(executionRuntimeProviders.containsKey(appSuperClass)){
+            throw new IllegalStateException("Duplicated application type registered: "+appSuperClass.getCanonicalName());
+        }
+        executionRuntimeProviders.put(appSuperClass,executionRuntimeProvider);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeProvider.java
new file mode 100644
index 0000000..5728eb5
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeProvider.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment;
+
+public interface ExecutionRuntimeProvider<Env extends Environment, Proc>{
+    ExecutionRuntime<Env,Proc> get();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
new file mode 100644
index 0000000..07113db
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.AbstractEnvironment;
+
+/**
+ * Storm Execution Environment Context
+ */
+public class SparkEnvironment extends AbstractEnvironment {
+    public SparkEnvironment(Config config){
+        super(config);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
new file mode 100644
index 0000000..5bcde92
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
+
+public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
+    @Override
+    public void prepare(SparkEnvironment environment) {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public SparkEnvironment environment() {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void start(Application executor, Configuration config) {
+
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void stop(Application executor, Configuration config) {
+
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void status(Application executor, Configuration config) {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
+        @Override
+        public SparkExecutionRuntime get() {
+            return new SparkExecutionRuntime();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
new file mode 100644
index 0000000..4b4a0be
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.AbstractEnvironment;
+import org.apache.eagle.app.sink.FlattenEventMapper;
+import org.apache.eagle.app.sink.LoggingStreamSink;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.sink.StreamSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
+import storm.trident.spout.RichSpoutBatchExecutor;
+
+/**
+ * Storm Execution Environment Context
+ */
+public class StormEnvironment extends AbstractEnvironment {
+    public StormEnvironment(Config envConfig) {
+        super(envConfig);
+    }
+
+    public StormStreamSink getFlattenStreamSink(String streamId,Configuration appConfig) {
+        return ((StormStreamSink) streamSink().getSink(streamId,appConfig)).setEventMapper(new FlattenEventMapper(streamId));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
new file mode 100644
index 0000000..5854e08
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.*;
+import backtype.storm.utils.NimbusClient;
+import com.google.common.base.Preconditions;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.thrift7.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
+import storm.trident.spout.RichSpoutBatchExecutor;
+
+public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
+    private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
+    private static LocalCluster _localCluster;
+
+    private StormEnvironment environment;
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(){
+            @Override
+            public void run() {
+                if(_localCluster != null) {
+                    LOG.info("Shutting down local storm cluster instance");
+                    _localCluster.shutdown();
+                }
+            }
+        });
+    }
+
+    private static LocalCluster getLocalCluster(){
+        if(_localCluster == null){
+            _localCluster = new LocalCluster();
+        }
+        return _localCluster;
+    }
+
+    @Override
+    public void prepare(StormEnvironment environment) {
+        this.environment = environment;
+    }
+
+    @Override
+    public StormEnvironment environment() {
+        return this.environment;
+    }
+
+    private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
+    private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+    private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
+    private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+
+    public backtype.storm.Config getStormConfig(){
+        backtype.storm.Config conf = new backtype.storm.Config();
+        conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
+        conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
+        conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
+        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
+        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
+        conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
+        String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
+        if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
+            nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
+        } else {
+            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
+        }
+        Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
+        if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
+            nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
+        } else {
+            LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
+        }
+        conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
+        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
+        return conf;
+    }
+
+    @Override
+    public <Conf extends Configuration> void start(Application<Conf, StormEnvironment, StormTopology> executor, Conf config){
+        String topologyName = config.getAppId();
+        Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
+        StormTopology topology = executor.execute(config, environment);
+        LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
+        Config conf = getStormConfig();
+        if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
+            if(config.getJarPath() == null) config.setJarPath(DynamicJarPathFinder.findPath(executor.getClass()));
+            String jarFile = config.getJarPath();
+            synchronized (StormExecutionRuntime.class) {
+                System.setProperty("storm.jar", jarFile);
+                LOG.info("Submitting as cluster mode ...");
+                try {
+                    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+                } catch (AlreadyAliveException | InvalidTopologyException e) {
+                    LOG.error(e.getMessage(), e);
+                    throw new RuntimeException(e.getMessage(),e);
+                } finally {
+                    System.clearProperty("storm.jar");
+                }
+            }
+        } else {
+            LOG.info("Submitting as local mode ...");
+            getLocalCluster().submitTopology(topologyName, conf, topology);
+            LOG.info("Submitted");
+        }
+    }
+
+    @Override
+    public <Conf extends Configuration> void stop(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
+        String appId = config.getAppId();
+        if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
+            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
+            try {
+                stormClient.killTopology(appId);
+            } catch (NotAliveException | TException e) {
+                LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
+            }
+        } else {
+            KillOptions killOptions = new KillOptions();
+            killOptions.set_wait_secs(0);
+            getLocalCluster().killTopologyWithOpts(appId,killOptions);
+        }
+    }
+
+    @Override
+    public <Conf extends Configuration> void status(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
+        // TODO: Not implemented yet!
+        throw new RuntimeException("TODO: Not implemented yet!");
+    }
+
+    public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
+        @Override
+        public StormExecutionRuntime get() {
+            return new StormExecutionRuntime();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
index 8f1fcb9..13cf0c9 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
@@ -20,10 +20,10 @@
  * <h1>Application Management Framework Interfaces</h1>
  *
  * <ul>
- *     <li>Application Context (Runtime): org.apache.eagle.app.ApplicationContext</li>
+ *     <li>Application Context (Runtime): org.apache.eagle.app.service.ApplicationContext</li>
  *     <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li>
  *     <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li>
- *     <li>Application Lifecycle Listener (Callback): org.apache.eagle.app.ApplicationLifecycleListener</li>
+ *     <li>Application Lifecycle Listener (Callback): org.apache.eagle.app.ApplicationLifecycle</li>
  * </ul>
  */
 package org.apache.eagle.app;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
new file mode 100644
index 0000000..7a0de82
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.ApplicationLifecycle;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeManager;
+import org.apache.eagle.app.utils.ApplicationConfigHelper;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.StreamDesc;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Managed Application Interface: org.apache.eagle.app.service.ApplicationContext
+ *
+ * <ul>
+ *     <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li>
+ *     <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li>
+ *     <li>Application Lifecycle Listener (Installation): org.apache.eagle.app.ApplicationLifecycle</li>
+ * </ul>
+ */
+public class ApplicationContext implements Serializable, ApplicationLifecycle {
+    private final Configuration config;
+    private final Application application;
+    private final ExecutionRuntime runtime;
+    private final ApplicationEntity metadata;
+
+    /**
+     * @param metadata ApplicationEntity
+     * @param application Application
+     */
+    public ApplicationContext(Application application, ApplicationEntity metadata, Config config){
+        Preconditions.checkNotNull(application,"Application is null");
+        Preconditions.checkNotNull(metadata,"ApplicationEntity is null");
+        this.application = application;
+        this.metadata = metadata;
+        this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),config);
+        Map<String,Object> applicationConfig = metadata.getConfiguration();
+        if(applicationConfig == null) {
+            applicationConfig = Collections.emptyMap();
+        }
+        this.config = ApplicationConfigHelper.convertFrom(applicationConfig,application.getConfigType());
+        this.config.setMode(metadata.getMode());
+        this.config.setAppId(metadata.getAppId());
+    }
+
+    @Override
+    public void onInstall() {
+        List<StreamDesc> streamDescCollection = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
+            StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(streamDefinition.getStreamId(),this.config);
+            StreamDesc streamDesc = new StreamDesc();
+            streamDesc.setSchema(streamDefinition);
+            streamDesc.setSink(streamSinkConfig);
+            streamDesc.setStreamId(streamDefinition.getStreamId());
+            return streamDesc;
+        })).collect(Collectors.toList());
+        metadata.setStreams(streamDescCollection);
+    }
+
+    @Override
+    public void onUninstall() {
+        //
+    }
+
+    @Override
+    public void onStart() {
+        this.runtime.start(this.application,this.config);
+    }
+
+    @Override
+    public void onStop() {
+        this.runtime.stop(this.application,this.config);
+    }
+
+    public ApplicationEntity getMetadata() {
+        return metadata;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
index f681b6d..88c9f4d 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -20,7 +20,7 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
-import org.apache.eagle.app.ApplicationContext;
+import org.apache.eagle.app.service.ApplicationContext;
 import org.apache.eagle.app.service.ApplicationOperations;
 import org.apache.eagle.app.service.ApplicationManagementService;
 import org.apache.eagle.app.service.ApplicationProviderService;
@@ -65,35 +65,44 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         applicationEntity.setSite(siteEntity);
         applicationEntity.setConfiguration(operation.getConfiguration());
         applicationEntity.setMode(operation.getMode());
-        ApplicationContext applicationContext = new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),config);
-        applicationEntity.setStreams(applicationContext.getStreamSinkDescs());
-        applicationContext.onAppInstall();
+        ApplicationContext applicationContext = new ApplicationContext(
+                applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
+                applicationEntity,config);
+        applicationContext.onInstall();
         applicationEntityService.create(applicationEntity);
         return applicationEntity;
     }
 
     public ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
-        ApplicationContext applicationContext = new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),config);
+        ApplicationContext applicationContext = new ApplicationContext(
+                applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
+                applicationEntity,config);
         // TODO: Check status, skip stop if already STOPPED
         try {
-            applicationContext.onAppStop();
+            applicationContext.onStop();
         }catch (Throwable throwable){
             LOGGER.error(throwable.getMessage(),throwable);
         }
-        applicationContext.onAppUninstall();
+        applicationContext.onUninstall();
         return applicationEntityService.delete(applicationEntity);
     }
 
     public ApplicationEntity start(ApplicationOperations.StartOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
-        new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),this.config).onAppStart();
+        ApplicationContext applicationContext = new ApplicationContext(
+                applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
+                applicationEntity,config);
+        applicationContext.onStart();
         return applicationEntity;
     }
 
     public ApplicationEntity stop(ApplicationOperations.StopOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
-        new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),this.config).onAppStop();
+        ApplicationContext applicationContext = new ApplicationContext(
+                applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
+                applicationEntity,config);
+        applicationContext.onStop();
         return applicationEntity;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
index 762f024..42285b3 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
@@ -20,7 +20,7 @@ import com.typesafe.config.Config;
 import org.apache.eagle.app.config.ApplicationProviderConfig;
 import org.apache.eagle.app.service.ApplicationProviderLoader;
 import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.app.tools.DynamicJarPathFinder;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
deleted file mode 100644
index 8280b5b..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.app.sink.mapper.StreamEventMapper;
-import org.apache.eagle.metadata.model.StreamSinkDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public abstract class AbstractStreamSink<K extends StreamSinkDesc> extends StreamSink<K> {
-    private final static Logger LOG = LoggerFactory.getLogger(AbstractStreamSink.class);
-    private final static String KEY_FIELD = "KEY";
-    private final static String VALUE_FIELD = "VALUE";
-    private StreamEventMapper streamEventMapper;
-
-    public AbstractStreamSink<K> setEventMapper(StreamEventMapper streamEventMapper){
-        this.streamEventMapper = streamEventMapper;
-        return this;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context) {
-        super.prepare(stormConf, context);
-    }
-
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        try {
-            List<StreamEvent> streamEvents = streamEventMapper.map(input);
-            if(streamEvents!=null) {
-                streamEvents.forEach((streamEvent -> {
-                    try {
-                        onEvent(streamEvent);
-                    } catch (Exception e) {
-                        LOG.error("Failed to execute event {}", streamEvent);
-                        collector.reportError(e);
-                    }
-                }));
-            }
-        } catch (Exception e) {
-                LOG.error("Failed to execute event {}",input);
-                collector.reportError(e);
-        }
-    }
-
-    protected abstract void onEvent(StreamEvent streamEvent);
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(KEY_FIELD,VALUE_FIELD));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ad1a418/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
new file mode 100644
index 0000000..4a20e2b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+public class DefaultStreamSinkConfig implements StreamSinkConfig {
+    private final Class<?> streamPersistClass;
+    private final static String NONE_STORAGE_TYPE = "NONE";
+
+    public DefaultStreamSinkConfig(Class<?> streamPersistClass){
+        this.streamPersistClass = streamPersistClass;
+    }
+
+    @Override
+    public String getType() {
+        return NONE_STORAGE_TYPE;
+    }
+
+    public Class<?> getSinkType() {
+        return streamPersistClass;
+    }
+
+    @Override
+    public Class<? extends StreamSinkConfig> getConfigType() {
+        return DefaultStreamSinkConfig.class;
+    }
+}
\ No newline at end of file



Mime
View raw message