eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [3/3] incubator-eagle git commit: HBase audit monitoring with new app framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang Reviewer: Hao Chen
Date Mon, 08 Aug 2016 00:46:17 GMT
HBase audit monitoring with new app framework
https://issues.apache.org/jira/browse/EAGLE-420
Author: Yong Zhang
Reviewer: Hao Chen


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/660bfbd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/660bfbd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/660bfbd3

Branch: refs/heads/develop
Commit: 660bfbd3fac6febddb612b9b629a8d466b536b3d
Parents: 1d84256
Author: yonzhang <yonzhang2012@gmail.com>
Authored: Sun Aug 7 17:49:48 2016 -0700
Committer: yonzhang <yonzhang2012@gmail.com>
Committed: Sun Aug 7 17:49:48 2016 -0700

----------------------------------------------------------------------
 .../TestSiddhiStateSnapshotAndRestore.java      | 506 -------------------
 .../resolver/AttributeResolveResource.java      |   6 +-
 .../apache/eagle/app/AbstractApplication.java   | 143 +++---
 .../java/org/apache/eagle/app/Application.java  |  11 +-
 .../eagle/app/environment/ExecutionRuntime.java | 107 ++--
 .../environment/impl/SparkExecutionRuntime.java | 117 ++---
 .../app/environment/impl/StormEnvironment.java  |  84 +--
 .../environment/impl/StormExecutionRuntime.java | 327 ++++++------
 .../eagle/app/service/ApplicationContext.java   |  13 +-
 .../impl/ApplicationManagementServiceImpl.java  |  36 +-
 .../apache/eagle/app/sink/KafkaStreamSink.java  |  52 +-
 .../eagle/app/sink/KafkaStreamSinkConfig.java   |  29 +-
 .../eagle/app/sink/LoggingStreamSink.java       |   5 +-
 .../apache/eagle/app/sink/StormStreamSink.java  |   2 +-
 .../eagle/app/sink/StreamSinkProvider.java      |  93 ++--
 .../app/spi/AbstractApplicationProvider.java    |   2 +-
 .../eagle/app/spi/ApplicationProvider.java      |   2 +-
 .../eagle/app/test/ServerSimulatorImpl.java     |   7 +-
 .../apache/eagle/app/TestStormApplication.java  | 174 ++++---
 .../eagle/app/storm/MockStormApplication.java   | 191 ++++---
 .../app/storm/MockStormApplicationTest.java     | 126 ++---
 .../src/test/resources/application.conf         |  17 +-
 .../ApplicationManagementResource.java          |   1 -
 .../entity/GenericServiceAPIResponseEntity.java |  12 +-
 ...ricServiceAPIResponseEntityDeserializer.java |   5 +-
 .../generic/GenericEntityServiceResource.java   |   6 +-
 .../app/example/ExampleStormApplication.java    |  10 +-
 .../example/ExampleApplicationProviderTest.java |  33 +-
 .../src/test/resources/application.conf         |  12 +-
 .../apache/eagle/app/jpm/JPMApplication.java    |   8 +-
 .../eagle/app/jpm/JPMApplicationTest.java       |  19 +-
 eagle-security/eagle-security-common/pom.xml    |   5 +
 .../service/IMetadataServiceClient.java         |  32 ++
 .../security/service/ISecurityMetadataDAO.java  |   1 +
 .../security/service/InMemMetadataDaoImpl.java  |   5 +-
 .../security/service/MetadataDaoFactory.java    |  32 +-
 .../service/MetadataServiceClientImpl.java      | 114 +++++
 .../service/SensitivityMetadataResource.java    |  27 -
 .../topo/NewKafkaSourcedSpoutProvider.java      |   2 -
 .../security/hbase/HBaseAuditLogAppConf.java    |  28 +
 .../hbase/HBaseAuditLogAppProvider.java         |  38 ++
 .../hbase/HBaseAuditLogApplication.java         |  31 +-
 .../hbase/HbaseAuditLogMonitoringMain.java      |  72 ---
 .../HbaseResourceSensitivityDataJoinBolt.java   |   8 +-
 .../HbaseResourceSensitivityPollingJob.java     |  36 +-
 .../src/main/resources/META-INF/metadata.xml    | 243 +++++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |  35 ++
 .../src/main/resources/application.conf         |   8 +-
 .../src/main/resources/application.conf.bak     |  66 ---
 .../src/main/resources/metadata.xml             |  91 ----
 .../src/main/resources/scripts                  |  22 +
 .../hbase/TestHbaseAuditLogProcessTopology.java |  44 --
 .../hbase/SensitivityMetadataResource.java      |  64 +++
 eagle-server/pom.xml                            |  14 +
 .../src/main/resources/application.conf         |   2 +-
 .../src/main/resources/configuration.yml        |  21 +
 .../eagle/server/ServerApplicationTest.java     |   4 +-
 .../src/test/resources/configuration.yml        |  21 +
 58 files changed, 1609 insertions(+), 1613 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
deleted file mode 100644
index 131be28..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import org.apache.eagle.common.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-import org.wso2.siddhi.core.util.persistence.InMemoryPersistenceStore;
-import org.wso2.siddhi.core.util.persistence.PersistenceStore;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-
-/**
- * experiment Siddhi state snapshot and restore
- */
-public class TestSiddhiStateSnapshotAndRestore {
-    private ExecutionPlanRuntime setupRuntimeForSimple(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "" +
-                "define stream testStream (cmd string, src string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from testStream[(cmd == 'rename') and (src == '/tmp/pii')] " +
-                "select cmd, src " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-        executionPlanRuntime.addCallback("query1", callback);
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testSimpleSiddhiQuery() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForSimple();
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii"});
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForSimple();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii"});
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForLengthSlideWindow(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream testStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream#window.length(3) "
-                + " select *"
-                + " insert all events into OutputStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Ignore
-    public void testLengthSlideWindow() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-lengthslidewindow";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForLengthSlideWindow();
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_1"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_2"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_3"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_4"});
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForLengthSlideWindow();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_5"});
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForLengthSlideWindowWithGroupby(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream testStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream#window.length(50) "
-                + " select user, cmd, count(user) as cnt"
-                + " insert all events into OutputStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Ignore
-    public void testLengthSlideWindowWithGroupby() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-lengthslidewindowwithgroupby";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForLengthSlideWindowWithGroupby();
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_1"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_2"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_3"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_4"});
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForLengthSlideWindowWithGroupby();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_5"});
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForTimeSlideWindow(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
-                + " select user, timeStamp " +
-                "insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testTimeSlideWindow() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-timeslidewindow";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForTimeSlideWindow();
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
-
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForTimeSlideWindow();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        inputHandler = restoredRuntime.getInputHandler("testStream");
-        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
-        Thread.sleep(1000);
-        input.close();
-        restoredRuntime.shutdown();
-    }
-    
-    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,30 sec)"
-                + " select user, timeStamp, count(user) as cnt"
-                + " group by user"
-                + " having cnt > 2"
-               + " insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testExternalTimeSlideWindowWithGroupby() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-externaltimeslidewindow";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby();
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
-        Thread.sleep(1000);
-
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        inputHandler = restoredRuntime.getInputHandler("testStream");
-        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
-        Thread.sleep(1000);
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby_2(SiddhiManager siddhiManager){
-        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,300 sec)"
-                + " select user, timeStamp, count(user) as cnt"
-                + " group by user"
-                + " insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testExternalTimeSlideWindowWithGroupby_2() throws Exception{
-        SiddhiManager siddhiManager = new SiddhiManager();
-        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
-        siddhiManager.setPersistenceStore(persistenceStore);
-
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-        executionPlanRuntime.start();
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
-        Thread.sleep(100);
-        executionPlanRuntime.persist();
-        executionPlanRuntime.shutdown();
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
-        inputHandler = restoredRuntime.getInputHandler("testStream");
-        restoredRuntime.start();
-        restoredRuntime.restoreLastRevision();
-        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
-        Thread.sleep(1000);
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForInternalTimeSlideWindowWithGroupby(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-        String cseEventStream = "define stream testStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(5 sec)"
-                + " select user, count(user) as cnt"
-                + " group by user"
-                + " having cnt > 2"
-                + " insert events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testInternalTimeSlideWindowWithGroupby() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-internaltimeslidewindow";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForInternalTimeSlideWindowWithGroupby();
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForInternalTimeSlideWindowWithGroupby();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        inputHandler = restoredRuntime.getInputHandler("testStream");
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private int count;
-    private boolean eventArrived;
-
-    @Before
-    public void init() {
-        count = 0;
-        eventArrived = false;
-    }
-
-    /**
-     * Siddhi does not support external time window based snapshot
-     * @throws InterruptedException
-     */
-    public void persistenceTest7() throws InterruptedException {
-        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
-
-        SiddhiManager siddhiManager = new SiddhiManager();
-        siddhiManager.setPersistenceStore(persistenceStore);
-
-        String executionPlan = "" +
-                "@plan:name('Test') " +
-                "" +
-                "define stream StockStream (symbol string, price float, volume int, timestamp long);" +
-                "" +
-                "@info(name = 'query1')" +
-                "from StockStream#window.externalTime(timestamp,30 sec) " +
-                "select symbol, price, sum(volume) as totalVol, count(symbol) as cnt " +
-                "group by symbol " +
-                "insert into OutStream ";
-
-        QueryCallback queryCallback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                eventArrived = true;
-                for (Event inEvent : inEvents) {
-                    count++;
-                    Assert.assertTrue("IBM".equals(inEvent.getData(0)) || "WSO2".equals(inEvent.getData(0)));
-                    if (count == 5) {
-                        Assert.assertEquals(400l, inEvent.getData(2));
-                    }
-                    if (count == 6) {
-                        Assert.assertEquals(200l, inEvent.getData(2));
-                    }
-                }
-            }
-        };
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-        executionPlanRuntime.addCallback("query1", queryCallback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("StockStream");
-        executionPlanRuntime.start();
-        long currentTime = 0;
-
-        inputHandler.send(new Object[]{"IBM", 75.1f, 100, currentTime + 1000});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{"WSO2", 75.2f, 100, currentTime + 2000});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{"IBM", 75.3f, 100, currentTime + 3000});
-
-        Thread.sleep(500);
-        Assert.assertTrue(eventArrived);
-        Assert.assertEquals(3, count);
-
-        //persisting
-        Thread.sleep(500);
-        executionPlanRuntime.persist();
-
-        //restarting execution plan
-        Thread.sleep(500);
-        executionPlanRuntime.shutdown();
-        executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-        executionPlanRuntime.addCallback("query1", queryCallback);
-        inputHandler = executionPlanRuntime.getInputHandler("StockStream");
-        executionPlanRuntime.start();
-
-        //loading
-        executionPlanRuntime.restoreLastRevision();
-
-        inputHandler.send(new Object[]{"IBM", 75.4f, 100, currentTime + 4000});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{"IBM", 75.5f, 100, currentTime + 5000});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{"WSO2", 75.6f, 100, currentTime + 6000});
-
-        //shutdown execution plan
-        Thread.sleep(500);
-        executionPlanRuntime.shutdown();
-
-        Assert.assertEquals(count, 6);
-        Assert.assertEquals(true, eventArrived);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
index 68995d1..bcffec1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
@@ -49,7 +49,7 @@ public class AttributeResolveResource {
             response.setObj(result);
         } catch (Exception e) {
             response.setSuccess(false);
-            response.setException(EagleExceptionWrapper.wrap(e));
+            response.setException(e);
             return response;
         }
         return response;
@@ -73,9 +73,9 @@ public class AttributeResolveResource {
             response.setObj(result);
         } catch (Exception e) {
             response.setSuccess(false);
-            response.setException(EagleExceptionWrapper.wrap(e));
+            response.setException(e);
             return response;
         }
         return response;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
index dca2e3d..5b498eb 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
@@ -1,69 +1,74 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.Environment;
-import org.apache.eagle.app.environment.ExecutionRuntimeManager;
-import org.apache.eagle.app.utils.ApplicationConfigHelper;
-
-import java.lang.reflect.ParameterizedType;
-import java.util.Map;
-
-abstract class AbstractApplication<Conf extends Configuration,Env extends Environment,Proc> implements Application<Conf,Env,Proc>, ApplicationTool<Conf> {
-    private Class<Conf> parametrizedConfigClass;
-
-    @Override
-    public Proc execute(Map<String, Object> config, Env env) {
-        return execute(ApplicationConfigHelper.convertFrom(config, getConfigType()),env);
-    }
-
-    /**
-     *  Map application configuration from environment
-     *
-     * @param config
-     * @return
-     */
-    private Conf loadAppConfigFromEnv(Config config){
-        return ApplicationConfigHelper.convertFrom(ApplicationConfigHelper.unwrapFrom(config,getClass().getCanonicalName()), getConfigType());
-    }
-
-    @Override
-    public void run(Config config) {
-        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,loadAppConfigFromEnv(config));
-    }
-
-    @Override
-    public void run(Configuration conf, Config config) {
-        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(), config).start(this,conf);
-    }
-
-    @Override
-    public Proc execute(Env environment) {
-        return execute(loadAppConfigFromEnv(environment.config()),environment);
-    }
-
-    /**
-     * @return Config class from Generic Type
-     */
-    public Class<Conf> getConfigType(){
-        if (parametrizedConfigClass == null) {
-            this.parametrizedConfigClass = (Class<Conf>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-        }
-        return parametrizedConfigClass;
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.Environment;
+import org.apache.eagle.app.environment.ExecutionRuntimeManager;
+import org.apache.eagle.app.utils.ApplicationConfigHelper;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.Map;
+
+abstract class AbstractApplication<Conf extends Configuration,Env extends Environment,Proc> implements Application<Conf,Env,Proc>, ApplicationTool<Conf> {
+    private Class<Conf> parametrizedConfigClass;
+
+    @Override
+    public Proc execute(Map<String, Object> config, Env env) {
+        return execute(ApplicationConfigHelper.convertFrom(config, getConfigType()),env);
+    }
+
+    @Override
+    public Proc execute(Config config, Env environment){
+        return null;
+    }
+    /**
+     *  Map application configuration from environment
+     *
+     * @param config
+     * @return
+     */
+    private Conf loadAppConfigFromEnv(Config config){
+        return ApplicationConfigHelper.convertFrom(ApplicationConfigHelper.unwrapFrom(config,getClass().getCanonicalName()), getConfigType());
+    }
+
+    @Override
+    public void run(Config config) {
+//        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,loadAppConfigFromEnv(config));
+        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,config);
+    }
+
+    @Override
+    public void run(Configuration conf, Config config) {
+//        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(), config).start(this,conf);
+    }
+
+    @Override
+    public Proc execute(Env environment) {
+        return execute(loadAppConfigFromEnv(environment.config()),environment);
+    }
+
+    /**
+     * @return Config class from Generic Type
+     */
+    public Class<Conf> getConfigType(){
+        if (parametrizedConfigClass == null) {
+            this.parametrizedConfigClass = (Class<Conf>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+        }
+        return parametrizedConfigClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
index 699b13e..d1e4c9b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.app;
 
+import com.typesafe.config.Config;
 import org.apache.eagle.app.environment.Environment;
 
 import java.io.Serializable;
@@ -63,6 +64,14 @@ public interface Application <
     Proc execute(Map<String,Object> config, Env environment);
 
     /**
+     * Execute with type-safe configuration
+     * @param config
+     * @param environment
+     * @return
+     */
+    Proc execute(Config config, Env environment);
+
+    /**
      * Execute with environment based configuration
      *
      * Light-weight Runner (dry-run/test purpose) oriented interface
@@ -81,4 +90,4 @@ public interface Application <
      * @return application environment type
      */
     Class<? extends Env> getEnvironmentType();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
index 7605d92..c4e89f4 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
@@ -1,53 +1,54 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment;
-
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-
-/**
- * Execution Runtime Adapter
- */
-public interface ExecutionRuntime<Env extends Environment, Proc> {
-    /**
-     * @param environment
-     */
-    void prepare(Env environment);
-
-    Env environment();
-
-    /**
-     * @param executor
-     * @param config
-     * @param <Conf>
-     */
-    <Conf extends Configuration> void start(Application<Conf,Env, Proc> executor, Conf config);
-
-    /**
-     * @param executor
-     * @param config
-     * @param <Conf>
-     */
-    <Conf extends Configuration> void stop(Application<Conf,Env, Proc> executor, Conf config);
-
-    /**
-     * @param executor
-     * @param config
-     * @param <Conf>
-     */
-    <Conf extends Configuration> void status(Application<Conf,Env, Proc> executor, Conf config);
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+
+/**
+ * Execution Runtime Adapter
+ */
+public interface ExecutionRuntime<Env extends Environment, Proc> {
+    /**
+     * @param environment
+     */
+    void prepare(Env environment);
+
+    Env environment();
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void start(Application<Conf,Env, Proc> executor, Config config);
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void stop(Application<Conf,Env, Proc> executor, Config config);
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void status(Application<Conf,Env, Proc> executor, Config config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
index 5bcde92..5d4e049 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
@@ -1,58 +1,59 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-
-public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
-    @Override
-    public void prepare(SparkEnvironment environment) {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public SparkEnvironment environment() {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public void start(Application executor, Configuration config) {
-
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public void stop(Application executor, Configuration config) {
-
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public void status(Application executor, Configuration config) {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
-        @Override
-        public SparkExecutionRuntime get() {
-            return new SparkExecutionRuntime();
-        }
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
+
+public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
+    @Override
+    public void prepare(SparkEnvironment environment) {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public SparkEnvironment environment() {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void start(Application executor, Config config) {
+
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void stop(Application executor, Config config) {
+
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void status(Application executor, Config config) {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
+        @Override
+        public SparkExecutionRuntime get() {
+            return new SparkExecutionRuntime();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 4b4a0be..1112588 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -1,42 +1,42 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.AbstractEnvironment;
-import org.apache.eagle.app.sink.FlattenEventMapper;
-import org.apache.eagle.app.sink.LoggingStreamSink;
-import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.app.sink.StreamSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-/**
- * Storm Execution Environment Context
- */
-public class StormEnvironment extends AbstractEnvironment {
-    public StormEnvironment(Config envConfig) {
-        super(envConfig);
-    }
-
-    public StormStreamSink getFlattenStreamSink(String streamId,Configuration appConfig) {
-        return ((StormStreamSink) streamSink().getSink(streamId,appConfig)).setEventMapper(new FlattenEventMapper(streamId));
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.AbstractEnvironment;
+import org.apache.eagle.app.sink.FlattenEventMapper;
+import org.apache.eagle.app.sink.LoggingStreamSink;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.sink.StreamSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
+import storm.trident.spout.RichSpoutBatchExecutor;
+
+/**
+ * Storm Execution Environment Context
+ */
+public class StormEnvironment extends AbstractEnvironment {
+    public StormEnvironment(Config envConfig) {
+        super(envConfig);
+    }
+
+    public StormStreamSink getFlattenStreamSink(String streamId, Config config) {
+        return ((StormStreamSink) streamSink().getSink(streamId,config)).setEventMapper(new FlattenEventMapper(streamId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 23e7334..06c22dc 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -1,162 +1,165 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.*;
-import backtype.storm.utils.NimbusClient;
-import com.google.common.base.Preconditions;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
-    private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
-    private static LocalCluster _localCluster;
-
-    private StormEnvironment environment;
-
-//    static {
-//        Runtime.getRuntime().addShutdownHook(new Thread(){
-//            @Override
-//            public void run() {
-//                if(_localCluster != null) {
-//                    LOG.info("Shutting down local storm cluster instance");
-//                    _localCluster.shutdown();
-//                }
-//            }
-//        });
-//    }
-
-    private static LocalCluster getLocalCluster(){
-        if(_localCluster == null){
-            _localCluster = new LocalCluster();
-        }
-        return _localCluster;
-    }
-
-    @Override
-    public void prepare(StormEnvironment environment) {
-        this.environment = environment;
-    }
-
-    @Override
-    public StormEnvironment environment() {
-        return this.environment;
-    }
-
-    private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
-    private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
-    private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
-    private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
-
-    public backtype.storm.Config getStormConfig(){
-        backtype.storm.Config conf = new backtype.storm.Config();
-        conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
-        conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
-        conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
-        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
-        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
-        String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
-        if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
-            nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
-        } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
-        }
-        Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
-        if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
-            nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
-        } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
-        }
-        conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
-        return conf;
-    }
-
-    @Override
-    public <Conf extends Configuration> void start(Application<Conf, StormEnvironment, StormTopology> executor, Conf config){
-        String topologyName = config.getAppId();
-        Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
-        StormTopology topology = executor.execute(config, environment);
-        LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
-        Config conf = getStormConfig();
-        if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
-            if(config.getJarPath() == null) config.setJarPath(DynamicJarPathFinder.findPath(executor.getClass()));
-            String jarFile = config.getJarPath();
-            synchronized (StormExecutionRuntime.class) {
-                System.setProperty("storm.jar", jarFile);
-                LOG.info("Submitting as cluster mode ...");
-                try {
-                    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
-                } catch (AlreadyAliveException | InvalidTopologyException e) {
-                    LOG.error(e.getMessage(), e);
-                    throw new RuntimeException(e.getMessage(),e);
-                } finally {
-                    System.clearProperty("storm.jar");
-                }
-            }
-        } else {
-            LOG.info("Submitting as local mode ...");
-            getLocalCluster().submitTopology(topologyName, conf, topology);
-            LOG.info("Submitted");
-        }
-    }
-
-    @Override
-    public <Conf extends Configuration> void stop(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
-        String appId = config.getAppId();
-        if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
-            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
-            try {
-                stormClient.killTopology(appId);
-            } catch (NotAliveException | TException e) {
-                LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
-            }
-        } else {
-            KillOptions killOptions = new KillOptions();
-            killOptions.set_wait_secs(0);
-            getLocalCluster().killTopologyWithOpts(appId,killOptions);
-        }
-    }
-
-    @Override
-    public <Conf extends Configuration> void status(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
-        // TODO: Not implemented yet!
-        throw new RuntimeException("TODO: Not implemented yet!");
-    }
-
-    public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
-        @Override
-        public StormExecutionRuntime get() {
-            return new StormExecutionRuntime();
-        }
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.*;
+import backtype.storm.utils.NimbusClient;
+import com.google.common.base.Preconditions;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.thrift7.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
+import storm.trident.spout.RichSpoutBatchExecutor;
+
+public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
+    private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
+    private static LocalCluster _localCluster;
+
+    private StormEnvironment environment;
+
+//    static {
+//        Runtime.getRuntime().addShutdownHook(new Thread(){
+//            @Override
+//            public void run() {
+//                if(_localCluster != null) {
+//                    LOG.info("Shutting down local storm cluster instance");
+//                    _localCluster.shutdown();
+//                }
+//            }
+//        });
+//    }
+
+    private static LocalCluster getLocalCluster(){
+        if(_localCluster == null){
+            _localCluster = new LocalCluster();
+        }
+        return _localCluster;
+    }
+
+    @Override
+    public void prepare(StormEnvironment environment) {
+        this.environment = environment;
+    }
+
+    @Override
+    public StormEnvironment environment() {
+        return this.environment;
+    }
+
+    private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
+    private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+    private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
+    private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+
+    public backtype.storm.Config getStormConfig(){
+        backtype.storm.Config conf = new backtype.storm.Config();
+        conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
+        conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
+        conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
+        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
+        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
+        conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
+        String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
+        if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
+            nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
+        } else {
+            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
+        }
+        Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
+        if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
+            nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
+        } else {
+            LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
+        }
+        conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
+        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
+        return conf;
+    }
+
+    @Override
+    public <Conf extends Configuration> void start(Application<Conf, StormEnvironment, StormTopology> executor, com.typesafe.config.Config config){
+        String topologyName = config.getString("appId");
+        Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
+        StormTopology topology = executor.execute(config, environment);
+        LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
+        Config conf = getStormConfig();
+        if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+//            if(config.getString("jarPath") == null) config.setJarPath(DynamicJarPathFinder.findPath(executor.getClass()));
+            String jarFile = config.getString("jarPath");
+            if(jarFile == null){
+                jarFile = DynamicJarPathFinder.findPath(executor.getClass());
+            }
+            synchronized (StormExecutionRuntime.class) {
+                System.setProperty("storm.jar", jarFile);
+                LOG.info("Submitting as cluster mode ...");
+                try {
+                    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+                } catch (AlreadyAliveException | InvalidTopologyException e) {
+                    LOG.error(e.getMessage(), e);
+                    throw new RuntimeException(e.getMessage(),e);
+                } finally {
+                    System.clearProperty("storm.jar");
+                }
+            }
+        } else {
+            LOG.info("Submitting as local mode ...");
+            getLocalCluster().submitTopology(topologyName, conf, topology);
+            LOG.info("Submitted");
+        }
+    }
+
+    @Override
+    public <Conf extends Configuration> void stop(Application<Conf,StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
+        String appId = config.getString("appId");
+        if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
+            try {
+                stormClient.killTopology(appId);
+            } catch (NotAliveException | TException e) {
+                LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
+            }
+        } else {
+            KillOptions killOptions = new KillOptions();
+            killOptions.set_wait_secs(0);
+            getLocalCluster().killTopologyWithOpts(appId,killOptions);
+        }
+    }
+
+    @Override
+    public <Conf extends Configuration> void status(Application<Conf,StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
+        // TODO: Not implemented yet!
+        throw new RuntimeException("TODO: Not implemented yet!");
+    }
+
+    public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
+        @Override
+        public StormExecutionRuntime get() {
+            return new StormExecutionRuntime();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
index 7a0de82..90137e5 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
@@ -18,6 +18,7 @@ package org.apache.eagle.app.service;
 
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.ApplicationLifecycle;
 import org.apache.eagle.app.Configuration;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
  * </ul>
  */
 public class ApplicationContext implements Serializable, ApplicationLifecycle {
-    private final Configuration config;
+    private final Config config;
     private final Application application;
     private final ExecutionRuntime runtime;
     private final ApplicationEntity metadata;
@@ -54,19 +55,17 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
      * @param metadata ApplicationEntity
      * @param application Application
      */
-    public ApplicationContext(Application application, ApplicationEntity metadata, Config config){
+    public ApplicationContext(Application application, ApplicationEntity metadata, Config config1){
         Preconditions.checkNotNull(application,"Application is null");
         Preconditions.checkNotNull(metadata,"ApplicationEntity is null");
         this.application = application;
         this.metadata = metadata;
-        this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),config);
+        this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),config1);
         Map<String,Object> applicationConfig = metadata.getConfiguration();
         if(applicationConfig == null) {
             applicationConfig = Collections.emptyMap();
         }
-        this.config = ApplicationConfigHelper.convertFrom(applicationConfig,application.getConfigType());
-        this.config.setMode(metadata.getMode());
-        this.config.setAppId(metadata.getAppId());
+        this.config = ConfigFactory.parseMap(applicationConfig);
     }
 
     @Override
@@ -100,4 +99,4 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
     public ApplicationEntity getMetadata() {
         return metadata;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
index 88c9f4d..c98e7cc 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -20,19 +20,26 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.service.ApplicationContext;
 import org.apache.eagle.app.service.ApplicationOperations;
 import org.apache.eagle.app.service.ApplicationManagementService;
 import org.apache.eagle.app.service.ApplicationProviderService;
+import org.apache.eagle.app.spi.ApplicationProvider;
 import org.apache.eagle.metadata.exceptions.EntityNotFoundException;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.Property;
 import org.apache.eagle.metadata.model.SiteEntity;
 import org.apache.eagle.metadata.service.ApplicationEntityService;
 import org.apache.eagle.metadata.service.SiteEntityService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 @Singleton
 public class ApplicationManagementServiceImpl implements ApplicationManagementService {
     private final SiteEntityService siteEntityService;
@@ -53,6 +60,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         this.applicationEntityService = applicationEntityService;
     }
 
+    @Override
     public ApplicationEntity install(ApplicationOperations.InstallOperation operation) throws EntityNotFoundException {
         Preconditions.checkNotNull(operation.getSiteId(),"siteId is null");
         Preconditions.checkNotNull(operation.getAppType(),"appType is null");
@@ -63,16 +71,38 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         ApplicationEntity applicationEntity = new ApplicationEntity();
         applicationEntity.setDescriptor(appDesc);
         applicationEntity.setSite(siteEntity);
-        applicationEntity.setConfiguration(operation.getConfiguration());
+
+        /**
+         *  calculate application config based on
+         *   1) default values in metadata.xml
+         *   2) user's config value
+         *   3) some metadata, for example siteId, mode, appId
+         */
+        Map<String, Object> appConfig = new HashMap<>();
+        ApplicationProvider provider = applicationProviderService.getApplicationProviderByType(operation.getAppType());
+        List<Property> propertyList = provider.getApplicationDesc().getConfiguration().getProperties();
+        for(Property p : propertyList){
+            appConfig.put(p.getName(), p.getValue());
+        }
+        if(operation.getConfiguration() != null) {
+            appConfig.putAll(operation.getConfiguration());
+        }
+        appConfig.put("siteId", operation.getSiteId());
+        appConfig.put("mode", operation.getMode().name());
+        appConfig.put("appId", operation.getAppType());
+
+        applicationEntity.setConfiguration(appConfig);
         applicationEntity.setMode(operation.getMode());
         ApplicationContext applicationContext = new ApplicationContext(
                 applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
                 applicationEntity,config);
         applicationContext.onInstall();
         applicationEntityService.create(applicationEntity);
+
         return applicationEntity;
     }
 
+    @Override
     public ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
@@ -88,6 +118,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         return applicationEntityService.delete(applicationEntity);
     }
 
+    @Override
     public ApplicationEntity start(ApplicationOperations.StartOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
@@ -97,6 +128,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         return applicationEntity;
     }
 
+    @Override
     public ApplicationEntity stop(ApplicationOperations.StopOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
@@ -105,4 +137,4 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         applicationContext.onStop();
         return applicationEntity;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index dda58d6..e4adfd2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -17,32 +17,60 @@
 package org.apache.eagle.app.sink;
 
 import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
 import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.app.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Properties;
 
 public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
-    private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class);
     private String topicId;
+    private Producer producer;
+    private KafkaStreamSinkConfig config;
 
     @Override
     public void init(String streamId, KafkaStreamSinkConfig config) {
         super.init(streamId, config);
         this.topicId = config.getTopicId();
+        this.config = config;
     }
 
     @Override
     public void prepare(Map stormConf, TopologyContext context) {
         super.prepare(stormConf, context);
-        // TODO: Create KafkaProducer
+        Properties properties = new Properties();
+        properties.put("metadata.broker.list", config.getBrokerList());
+        properties.put("serializer.class", config.getSerializerClass());
+        properties.put("key.serializer.class", config.getKeySerializerClass());
+        ProducerConfig producerConfig = new ProducerConfig(properties);
+        producer = new Producer(producerConfig);
     }
 
     @Override
     protected void onEvent(StreamEvent streamEvent) {
-        LOGGER.info("TODO: producing {} to '{}'",streamEvent,topicId);
+    }
+
+    @Override
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        LOG.info("TODO: producing {} to '{}'", input, topicId);
+
+        try {
+            Map m = (Map) input.getValue(1);
+            String output = new ObjectMapper().writeValueAsString(m);
+            producer.send(new KeyedMessage(this.topicId, m.get("user"), output));
+        }catch(Exception ex){
+            LOG.error("", ex);
+            collector.reportError(ex);
+        }
     }
 
     @Override
@@ -51,11 +79,11 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
     }
 
     private void ensureTopicCreated(){
-        LOGGER.info("TODO: ensure kafka topic {} created",this.topicId);
+        LOG.info("TODO: ensure kafka topic {} created",this.topicId);
     }
 
     private void ensureTopicDeleted(){
-        LOGGER.info("TODO: ensure kafka topic {} deleted",this.topicId);
+        LOG.info("TODO: ensure kafka topic {} deleted",this.topicId);
     }
 
     @Override
@@ -65,12 +93,12 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
 
     public static class Provider implements StreamSinkProvider<KafkaStreamSink,KafkaStreamSinkConfig> {
         @Override
-        public KafkaStreamSinkConfig getSinkConfig(String streamId, Configuration appConfig) {
-            String topicId = String.format("EAGLE.%s.%s",
-                    appConfig.getSiteId(),
-                    streamId).toLowerCase();
+        public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) {
             KafkaStreamSinkConfig desc = new KafkaStreamSinkConfig();
-            desc.setTopicId(topicId);
+            desc.setTopicId(config.getString("dataSinkConfig.topic"));
+            desc.setBrokerList(config.getString("dataSinkConfig.brokerList"));
+            desc.setSerializerClass(config.getString("dataSinkConfig.serializerClass"));
+            desc.setKeySerializerClass(config.getString("dataSinkConfig.keySerializerClass"));
             return desc;
         }
 
@@ -79,4 +107,4 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
             return new KafkaStreamSink();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
index 17a3aa8..9d6a0ab 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
@@ -20,6 +20,9 @@ import org.apache.eagle.metadata.model.StreamSinkConfig;
 
 public class KafkaStreamSinkConfig implements StreamSinkConfig {
     private String topicId;
+    private String brokerList;
+    private String serializerClass;
+    private String keySerializerClass;
 
     public String getTopicId() {
         return topicId;
@@ -29,6 +32,30 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
         this.topicId = topicId;
     }
 
+    public String getBrokerList() {
+        return brokerList;
+    }
+
+    public void setBrokerList(String brokerList) {
+        this.brokerList = brokerList;
+    }
+
+    public String getSerializerClass() {
+        return serializerClass;
+    }
+
+    public void setSerializerClass(String serializerClass) {
+        this.serializerClass = serializerClass;
+    }
+
+    public String getKeySerializerClass() {
+        return keySerializerClass;
+    }
+
+    public void setKeySerializerClass(String keySerializerClass) {
+        this.keySerializerClass = keySerializerClass;
+    }
+
     @Override
     public String getType() {
         return "KAFKA";
@@ -43,4 +70,4 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
     public Class<? extends StreamSinkConfig> getConfigType() {
         return KafkaStreamSinkConfig.class;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index 3efd811..0d835d6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.app.sink;
 
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.app.Configuration;
@@ -43,7 +44,7 @@ public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig>
 
     public static class Provider implements StreamSinkProvider<LoggingStreamSink,DefaultStreamSinkConfig> {
         @Override
-        public DefaultStreamSinkConfig getSinkConfig(String streamId, Configuration appConfig) {
+        public DefaultStreamSinkConfig getSinkConfig(String streamId, Config config) {
             return new DefaultStreamSinkConfig(LoggingStreamSink.class);
         }
 
@@ -52,4 +53,4 @@ public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig>
             return new LoggingStreamSink();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
index 1b35a99..6765443 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
@@ -85,4 +85,4 @@ public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseBa
     public String getStreamId() {
         return streamId;
     }
-}
\ No newline at end of file
+}


Mime
View raw message