eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [2/3] incubator-eagle git commit: HBase audit monitoring with new app framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang Reviewer: Hao Chen
Date Mon, 08 Aug 2016 00:46:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
index 7c64e50..60f49ef 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
@@ -1,46 +1,47 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-import java.lang.reflect.ParameterizedType;
-
-public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
-    /**
-     * @param streamId
-     * @param appConfig
-     * @return
-     */
-    D getSinkConfig(String streamId, Configuration appConfig);
-    S getSink();
-
-    default S getSink(String streamId, Configuration appConfig){
-        S s = getSink();
-        s.init(streamId,getSinkConfig(streamId,appConfig));
-        return s;
-    }
-
-    default Class<? extends S> getSinkType(){
-        return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-    }
-
-    default Class<? extends D> getSinkConfigType(){
-        return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+import java.lang.reflect.ParameterizedType;
+
+public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
+    /**
+     * @param streamId
+     * @param config
+     * @return
+     */
+    D getSinkConfig(String streamId, Config config);
+    S getSink();
+
+    default S getSink(String streamId, Config config){
+        S s = getSink();
+        s.init(streamId,getSinkConfig(streamId,config));
+        return s;
+    }
+
+    default Class<? extends S> getSinkType(){
+        return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+    }
+
+    default Class<? extends D> getSinkConfigType(){
+        return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 20816db..bf1e587 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -128,4 +128,4 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
     public ApplicationDesc getApplicationDesc() {
         return applicationDesc;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index ace0c45..be84f0c 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -34,4 +34,4 @@ public interface ApplicationProvider<T extends Application> {
      * @return application instance
      */
     T getApplication();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
index 2c686d9..1ef91ff 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
@@ -54,8 +54,11 @@ public class ServerSimulatorImpl extends ServerSimulator {
         SiteEntity siteEntity = getUniqueSite();
         siteResource.createSite(siteEntity);
         Assert.assertNotNull(siteEntity.getUuid());
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(appConfig);
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL)).getData();
+        ApplicationEntity applicationEntity =
+                applicationResource.installApplication(installOperation).getData();
         // Start application
         applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
     }
@@ -86,4 +89,4 @@ public class ServerSimulatorImpl extends ServerSimulator {
             throw new IllegalStateException(e.getMessage(),e);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
index 0558454..f58f0aa 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
@@ -1,84 +1,90 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.junit.Ignore;
-
-import java.util.Arrays;
-import java.util.Map;
-
-@Ignore
-public class TestStormApplication extends StormApplication<TestStormApplication.TestStormAppConfig>{
-    @Override
-    public StormTopology execute(TestStormAppConfig config, StormEnvironment environment) {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
-        builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
-        builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
-        return builder.createTopology();
-    }
-
-    public final static class TestStormAppConfig extends Configuration{
-        private int spoutNum = 1;
-
-        public int getSpoutNum() {
-            return spoutNum;
-        }
-
-        public void setSpoutNum(int spoutNum) {
-            this.spoutNum = spoutNum;
-        }
-    }
-
-    private class RandomEventSpout extends BaseRichSpout {
-        private SpoutOutputCollector _collector;
-        @Override
-        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-            _collector = spoutOutputCollector;
-        }
-
-        @Override
-        public void nextTuple() {
-            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
-            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
-        }
-    }
-
-    public final static class Provider extends AbstractApplicationProvider<TestStormApplication> {
-        public Provider(){
-            super("TestApplicationMetadata.xml");
-        }
-        @Override
-        public TestStormApplication getApplication() {
-            return new TestStormApplication();
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.junit.Ignore;
+
+import java.util.Arrays;
+import java.util.Map;
+
+@Ignore
+public class TestStormApplication extends StormApplication<TestStormApplication.TestStormAppConfig>{
+    @Override
+    public StormTopology execute(TestStormAppConfig config, StormEnvironment environment){
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+        builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+        builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+        return builder.createTopology();
+    }
+
+    public final static class TestStormAppConfig extends Configuration{
+        private int spoutNum = 1;
+
+        public int getSpoutNum() {
+            return spoutNum;
+        }
+
+        public void setSpoutNum(int spoutNum) {
+            this.spoutNum = spoutNum;
+        }
+    }
+
+    private class RandomEventSpout extends BaseRichSpout {
+        private SpoutOutputCollector _collector;
+        @Override
+        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+            _collector = spoutOutputCollector;
+        }
+
+        @Override
+        public void nextTuple() {
+            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+        }
+    }
+
+    public final static class Provider extends AbstractApplicationProvider<TestStormApplication> {
+        public Provider(){
+            super("TestApplicationMetadata.xml");
+        }
+        @Override
+        public TestStormApplication getApplication() {
+            return new TestStormApplication();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
index 3db5f20..bbdfbfa 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
@@ -1,97 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.storm;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-
-import java.util.Arrays;
-import java.util.Map;
-
-public class MockStormApplication extends StormApplication<MockStormApplication.MockStormConfiguration> {
-    private MockStormConfiguration appConfig;
-
-    @Override
-    public StormTopology execute(MockStormConfiguration config, StormEnvironment environment) {
-        this.setAppConfig(config);
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
-        builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
-        builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
-        return builder.createTopology();
-    }
-
-    public MockStormConfiguration getAppConfig() {
-        return appConfig;
-    }
-
-    private void setAppConfig(MockStormConfiguration appConfig) {
-        this.appConfig = appConfig;
-    }
-
-    /**
-     * TODO: Load configuration from name space in application className
-     * Application Configuration
-     */
-    static class MockStormConfiguration extends Configuration {
-        private int spoutNum = 1;
-        private boolean loaded = false;
-
-        public int getSpoutNum() {
-            return spoutNum;
-        }
-
-        public void setSpoutNum(int spoutNum) {
-            this.spoutNum = spoutNum;
-        }
-
-        public boolean isLoaded() {
-            return loaded;
-        }
-
-        public void setLoaded(boolean loaded) {
-            this.loaded = loaded;
-        }
-    }
-
-    private class RandomEventSpout extends BaseRichSpout {
-        private SpoutOutputCollector _collector;
-        @Override
-        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-            _collector = spoutOutputCollector;
-        }
-
-        @Override
-        public void nextTuple() {
-            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
-            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
-        }
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.storm;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class MockStormApplication extends StormApplication<MockStormApplication.MockStormConfiguration> {
+    private MockStormConfiguration appConfig;
+
+    @Override
+    public StormTopology execute(MockStormConfiguration config, StormEnvironment environment) {
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+        builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+        builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+        return builder.createTopology();
+    }
+
+    /**
+     * TODO: Load configuration from name space in application className
+     * Application Configuration
+     */
+    static class MockStormConfiguration extends Configuration {
+        private int spoutNum = 1;
+        private boolean loaded = false;
+
+        public int getSpoutNum() {
+            return spoutNum;
+        }
+
+        public void setSpoutNum(int spoutNum) {
+            this.spoutNum = spoutNum;
+        }
+
+        public boolean isLoaded() {
+            return loaded;
+        }
+
+        public void setLoaded(boolean loaded) {
+            this.loaded = loaded;
+        }
+    }
+
+    private class RandomEventSpout extends BaseRichSpout {
+        private SpoutOutputCollector _collector;
+        @Override
+        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+            _collector = spoutOutputCollector;
+        }
+
+        @Override
+        public void nextTuple() {
+            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
index 8e05b5e..32dae23 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
@@ -1,75 +1,51 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.storm;
-
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-
-public class MockStormApplicationTest {
-    @Test
-    public void testGetConfigClass(){
-        MockStormApplication mockStormApplication = new MockStormApplication();
-        Assert.assertEquals(MockStormApplication.MockStormConfiguration.class,mockStormApplication.getConfigType());
-    }
-
-    @Test
-    public void testGetConfigFromMap(){
-        MockStormApplication mockStormApplication = new MockStormApplication();
-        mockStormApplication.execute(new HashMap<String,Object>(){
-            {
-                put("spoutNum",1234);
-                put("loaded",true);
-                put("mode", ApplicationEntity.Mode.CLUSTER);
-            }
-        },new StormEnvironment(ConfigFactory.load()));
-        Assert.assertTrue(mockStormApplication.getAppConfig().isLoaded());
-        Assert.assertEquals(1234,mockStormApplication.getAppConfig().getSpoutNum());
-        Assert.assertEquals(ApplicationEntity.Mode.CLUSTER,mockStormApplication.getAppConfig().getMode());
-    }
-
-    @Test
-    public void testGetConfigFromEnvironmentConfigFile(){
-        MockStormApplication mockStormApplication = new MockStormApplication();
-        mockStormApplication.execute(new StormEnvironment(ConfigFactory.load()));
-        Assert.assertTrue(mockStormApplication.getAppConfig().isLoaded());
-        Assert.assertEquals(3,mockStormApplication.getAppConfig().getSpoutNum());
-        Assert.assertEquals(ApplicationEntity.Mode.LOCAL,mockStormApplication.getAppConfig().getMode());
-    }
-
-    @Test
-    public void testRunApplicationWithSysConfig(){
-        new MockStormApplication().run();
-    }
-
-    @Test
-    public void testRunApplicationWithAppConfig() throws InterruptedException {
-        MockStormApplication.MockStormConfiguration appConfig = new MockStormApplication.MockStormConfiguration();
-        appConfig.setJarPath(DynamicJarPathFinder.findPath(MockStormApplication.class));
-        appConfig.setSiteId("test_site");
-        appConfig.setAppId("test_application_storm_topology");
-        appConfig.setMode(ApplicationEntity.Mode.LOCAL);
-        appConfig.setLoaded(true);
-        appConfig.setSpoutNum(4);
-        new MockStormApplication().run(appConfig);
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.storm;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class MockStormApplicationTest {
+    @Test
+    public void testGetConfigClass(){
+        MockStormApplication mockStormApplication = new MockStormApplication();
+        Assert.assertEquals(MockStormApplication.MockStormConfiguration.class,mockStormApplication.getConfigType());
+    }
+
+    @Test
+    public void testRunApplicationWithSysConfig(){
+        new MockStormApplication().run();
+    }
+
+    @Test
+    public void testRunApplicationWithAppConfig() throws InterruptedException {
+        MockStormApplication.MockStormConfiguration appConfig = new MockStormApplication.MockStormConfiguration();
+        appConfig.setJarPath(DynamicJarPathFinder.findPath(MockStormApplication.class));
+        appConfig.setSiteId("test_site");
+        appConfig.setAppId("test_application_storm_topology");
+        appConfig.setMode(ApplicationEntity.Mode.LOCAL);
+        appConfig.setLoaded(true);
+        appConfig.setSpoutNum(4);
+        new MockStormApplication().run(appConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
index 9f154f9..64f0974 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -52,10 +52,15 @@
 		}
 	}
 
-	"org.apache.eagle.app.storm.MockStormApplication": {
-		"spoutNum": 3
-		"loaded": true
-		"mode":"LOCAL",
-		"appId":"test_topology_name"
+	"appId":"test_topology_name"
+	"spoutNum": 3
+	"loaded": true
+	"mode":"LOCAL"
+
+	"dataSinkConfig": {
+		"topic" : "test_topic",
+		"brokerList" : "sandbox.hortonworks.com:6667",
+		"serializerClass" : "kafka.serializer.StringEncoder",
+		"keySerializerClass" : "kafka.serializer.StringEncoder"
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
index 6e4521d..c651b6b 100644
--- a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
@@ -21,7 +21,6 @@ package org.apache.eagle.service.application;
 
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.service.application.dao.ApplicationManagerDAO;
 import org.apache.eagle.service.application.dao.ApplicationManagerDaoImpl;
 import org.apache.eagle.service.application.entity.TopologyExecutionStatus;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
index edb9fc0..2b37d25 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
@@ -111,11 +111,11 @@ public class GenericServiceAPIResponseEntity<T>{
 	public String getException() {
 		return exception;
 	}
-	public void setException(String exception) {
-		this.exception = exception;
-	}
+//	public void setException(String exception) {
+//		this.exception = exception;
+//	}
 
-    public void setException(Exception exception){
-        if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exception);
+    public void setException(Exception exceptionObj){
+        if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exceptionObj);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
index 85b875d..940ee8a 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
@@ -54,7 +54,8 @@ public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserialize
                 else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){
                     entity.setSuccess(field.getValue().getValueAsBoolean(false));
                 }else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){
-                    entity.setException(field.getValue().getTextValue());
+//                    entity.setException(field.getValue().getTextValue());
+                    entity.setException(new Exception(field.getValue().getTextValue()));
                 }else if(TYPE_FIELD.endsWith(field.getKey())  && field.getValue() != null){
                     try {
                         entity.setType(Class.forName(field.getValue().getTextValue()));
@@ -81,4 +82,4 @@ public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserialize
         }
         return entity;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
index fb52352..b7f27f5 100644
--- a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
@@ -439,7 +439,7 @@ public class GenericEntityServiceResource {
                 LOG.error("Data storage is null");
                 throw new IllegalDataStorageException("data storage is null");
             }
-            
+
             QueryResult<?> result = queryStatement.execute(dataStorage);
             if(result.isSuccess()){
                 meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
@@ -543,7 +543,7 @@ public class GenericEntityServiceResource {
                 LOG.error("Data storage is null");
                 throw new IllegalDataStorageException("Data storage is null");
             }
-            
+
             DeleteStatement deleteStatement = new DeleteStatement(rawQuery);
             ModifyResult<String> deleteResult = deleteStatement.execute(dataStorage);
             if(deleteResult.isSuccess()){
@@ -627,4 +627,4 @@ public class GenericEntityServiceResource {
         }
         return response;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
index 6819e59..d4c0e0c 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 
@@ -33,8 +34,13 @@ import java.util.Map;
 public class ExampleStormApplication extends StormApplication<ExampleStormConfig> {
     @Override
     public StormTopology execute(ExampleStormConfig config, StormEnvironment environment) {
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
         TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
+        builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
         builder.setBolt("sink_1",environment.getFlattenStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
         builder.setBolt("sink_2",environment.getFlattenStreamSink("SAMPLE_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
         return builder.createTopology();
@@ -59,4 +65,4 @@ public class ExampleStormApplication extends StormApplication<ExampleStormConfig
             outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
index 45dd7bd..88dd02d 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
@@ -26,10 +26,13 @@ import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.SiteEntity;
 import org.apache.eagle.metadata.resource.SiteResource;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 @RunWith(AppUnitTestRunner.class)
 public class ExampleApplicationProviderTest {
@@ -54,6 +57,7 @@ public class ExampleApplicationProviderTest {
      * @throws InterruptedException
      */
     @Test
+    @Ignore
     public void testApplicationLifecycle() throws InterruptedException {
         // Create local site
         SiteEntity siteEntity = new SiteEntity();
@@ -63,8 +67,10 @@ public class ExampleApplicationProviderTest {
         siteResource.createSite(siteEntity);
         Assert.assertNotNull(siteEntity.getUuid());
 
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(getConf());
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL)).getData();
+        ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
         // Start application
         applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
         // Stop application
@@ -81,16 +87,29 @@ public class ExampleApplicationProviderTest {
 
     @Test
     public void testApplicationQuickRunWithAppType(){
-        simulator.start("EXAMPLE_APPLICATION");
+        simulator.start("EXAMPLE_APPLICATION", getConf());
     }
 
+    @Ignore
     @Test
-    public void testApplicationQuickRunWithAppProvider(){
-        simulator.start(ExampleApplicationProvider.class);
+    public void testApplicationQuickRunWithAppProvider() throws Exception{
+        simulator.start(ExampleApplicationProvider.class, getConf());
     }
 
+    @Ignore
     @Test
-    public void testApplicationQuickRunWithAppProvider2(){
-        simulator.start(ExampleApplicationProvider2.class);
+    public void testApplicationQuickRunWithAppProvider2() throws Exception{
+        simulator.start(ExampleApplicationProvider2.class, getConf());
     }
-}
\ No newline at end of file
+
+    private Map<String, Object> getConf(){
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("dataSinkConfig.topic", "testTopic");
+        conf.put("dataSinkConfig.brokerList", "broker");
+        conf.put("dataSinkConfig.serializerClass", "serializerClass");
+        conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+        conf.put("spoutNum", 2);
+        conf.put("mode", "LOCAL");
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf
index bfbf20e..2873037 100644
--- a/eagle-examples/eagle-app-example/src/test/resources/application.conf
+++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf
@@ -56,7 +56,15 @@
 		}
 	},
 
-	"org.apache.eagle.app.example.ExampleStormApplication": {
 		"appId": "unit_test_example_app"
+	"spoutNum": 3
+	"loaded": true
+	"mode":"LOCAL"
+
+	"dataSinkConfig": {
+		"topic" : "test_topic",
+		"brokerList" : "sandbox.hortonworks.com:6667",
+		"serializerClass" : "kafka.serializer.StringEncoder",
+		"keySerializerClass" : "kafka.serializer.StringEncoder"
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
index 89e5433..25506cc 100644
--- a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
+++ b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
@@ -23,6 +23,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 
@@ -32,6 +33,11 @@ import java.util.Map;
 public class JPMApplication extends StormApplication<JPMConfiguration> {
     @Override
     public StormTopology execute(JPMConfiguration config, StormEnvironment environment) {
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("metric_spout", new RandomEventSpout(), 4);
         builder.setBolt("sink_1",environment.getFlattenStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
@@ -57,4 +63,4 @@ public class JPMApplication extends StormApplication<JPMConfiguration> {
             outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java b/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
index 30bbc96..c955d36 100644
--- a/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
@@ -27,6 +27,9 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.util.HashMap;
+import java.util.Map;
+
 @RunWith(AppUnitTestRunner.class)
 public class JPMApplicationTest {
     @Inject
@@ -53,8 +56,11 @@ public class JPMApplicationTest {
         siteResource.createSite(siteEntity);
         Assert.assertNotNull(siteEntity.getUuid());
 
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site","JPM_APP", ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(getConf());
+
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","JPM_APP", ApplicationEntity.Mode.LOCAL)).getData();
+        ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
         // Start application
         applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
         // Stop application
@@ -68,4 +74,15 @@ public class JPMApplicationTest {
             // Expected exception
         }
     }
+
+    private Map<String, Object> getConf(){
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("dataSinkConfig.topic", "testTopic");
+        conf.put("dataSinkConfig.brokerList", "broker");
+        conf.put("dataSinkConfig.serializerClass", "serializerClass");
+        conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+        conf.put("spoutNum", 2);
+        conf.put("mode", "LOCAL");
+        return conf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/pom.xml b/eagle-security/eagle-security-common/pom.xml
index 83971f2..18f9bd0 100644
--- a/eagle-security/eagle-security-common/pom.xml
+++ b/eagle-security/eagle-security-common/pom.xml
@@ -52,6 +52,11 @@
       <artifactId>eagle-alert-service</artifactId>
       <version>${project.version}</version>
     </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-metadata-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
 	<dependency>
 		<groupId>org.json</groupId>
 		<artifactId>json</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
new file mode 100644
index 0000000..e3c9f95
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.service;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * service stub to get metadata from remote metadata service
+ */
+public interface IMetadataServiceClient extends Closeable, Serializable {
+    Collection<HBaseSensitivityEntity> listHBaseSensitivies();
+    OpResult addHBaseSensitivity(Collection<HBaseSensitivityEntity> h);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
index 45e729e..534fb38 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
@@ -18,6 +18,7 @@
 package org.apache.eagle.security.service;
 
 import java.util.Collection;
+
 /**
  * Since 6/10/16.
  */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
index 53d7132..27aeb57 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
@@ -28,7 +28,7 @@ import java.util.*;
 /**
  * In memory service for simple service start. Make all service API as
  * synchronized.
- * 
+ *
  * @since Apr 11, 2016
  *
  */
@@ -39,8 +39,7 @@ public class InMemMetadataDaoImpl implements ISecurityMetadataDAO {
     private Map<Pair<String, String>, HBaseSensitivityEntity> hBaseSensitivityEntities = new HashMap<>();
 
     @Inject
-    public InMemMetadataDaoImpl(Config config) {
-        
+    public InMemMetadataDaoImpl() {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
index f17fd43..65e86f0 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
@@ -17,7 +17,6 @@
 package org.apache.eagle.security.service;
 
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,41 +27,28 @@ import java.lang.reflect.Constructor;
  *
  */
 public class MetadataDaoFactory {
-
-    private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory();
     private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class);
 
-    private ISecurityMetadataDAO dao;
-
-    private MetadataDaoFactory() {
-        Config config = ConfigFactory.load();
-        Config datastoreConfig = config.getConfig("datastore");
-        if (datastoreConfig == null) {
-            LOG.warn("datastore is not configured, use in-memory store !!!");
-            dao = new InMemMetadataDaoImpl(datastoreConfig);
+    public static ISecurityMetadataDAO getMetadataDAO(String storeCls) {
+        ISecurityMetadataDAO dao = null;
+        if (storeCls == null) {
+            LOG.warn("metadata store is not configured, use in-memory store !!!");
+            dao = new InMemMetadataDaoImpl();
         } else {
-            String clsName = datastoreConfig.getString("metadataDao");
             Class<?> clz;
             try {
-                clz = Thread.currentThread().getContextClassLoader().loadClass(clsName);
+                clz = Thread.currentThread().getContextClassLoader().loadClass(storeCls);
                 if (ISecurityMetadataDAO.class.isAssignableFrom(clz)) {
-                    Constructor<?> cotr = clz.getConstructor(Config.class);
-                    dao = (ISecurityMetadataDAO) cotr.newInstance(datastoreConfig);
+                    Constructor<?> cotr = clz.getConstructor();
+                    dao = (ISecurityMetadataDAO) cotr.newInstance();
                 } else {
                     throw new Exception("metadataDao configuration need to be implementation of IMetadataDao! ");
                 }
             } catch (Exception e) {
                 LOG.error("error when initialize the dao, fall back to in memory mode!", e);
-                dao = new InMemMetadataDaoImpl(datastoreConfig);
+                dao = new InMemMetadataDaoImpl();
             }
         }
-    }
-
-    public static MetadataDaoFactory getInstance() {
-        return INSTANCE;
-    }
-
-    public ISecurityMetadataDAO getMetadataDao() {
         return dao;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
new file mode 100644
index 0000000..676bde5
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.service;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.typesafe.config.Config;
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+public class MetadataServiceClientImpl implements IMetadataServiceClient {
+    private static final long serialVersionUID = 3003976065082684128L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetadataServiceClientImpl.class);
+
+    private static final String METADATA_LIST_HBASE_SENSITIVITY_PATH = "/metadata/sensitivity/hbase";
+    private static final String METADATA_ADD_HBASE_SENSITIVITY_PATH = "/metadata/sensitivity/hbase";
+
+    private static final String METADATA_CLEAR_PATH = "/metadata/clear";
+
+    private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
+    private static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
+    private static final String EAGLE_CORRELATION_SERVICE_HOST = "metadataService.host";
+
+    protected static final String CONTENT_TYPE = "Content-Type";
+
+    private String host;
+    private int port;
+    private String context;
+    private transient Client client;
+    private String basePath;
+
+    public MetadataServiceClientImpl(Config config) {
+        this(config.getString(EAGLE_CORRELATION_SERVICE_HOST), config.getInt(EAGLE_CORRELATION_SERVICE_PORT), config
+                .getString(EAGLE_CORRELATION_CONTEXT));
+        basePath = buildBasePath();
+    }
+
+    public MetadataServiceClientImpl(String host, int port, String context) {
+        this.host = host;
+        this.port = port;
+        this.context = context;
+        this.basePath = buildBasePath();
+        ClientConfig cc = new DefaultClientConfig();
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
+        cc.getClasses().add(JacksonJsonProvider.class);
+        cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+        this.client = Client.create(cc);
+        client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
+    }
+
+    private String buildBasePath() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("http://");
+        sb.append(host);
+        sb.append(":");
+        sb.append(port);
+        sb.append(context);
+        return sb.toString();
+    }
+
+    private <T> List<T> list(String path, GenericType<List<T>> type) {
+        WebResource r = client.resource(basePath + path);
+        LOG.info("query URL {}", basePath + path);
+        List<T> ret = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).get(type);
+        return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+        client.destroy();
+    }
+
+    @Override
+    public Collection<HBaseSensitivityEntity> listHBaseSensitivies() {
+        return list(METADATA_LIST_HBASE_SENSITIVITY_PATH, new GenericType<List<HBaseSensitivityEntity>>() {
+        });
+    }
+
+    @Override
+    public OpResult addHBaseSensitivity(Collection<HBaseSensitivityEntity> h) {
+        WebResource r = client.resource(basePath + METADATA_ADD_HBASE_SENSITIVITY_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(h);
+        return new OpResult();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
deleted file mode 100644
index 05440fb..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.service;
-
-import javax.ws.rs.Path;
-
-/**
- * Since 6/10/16.
- */
-@Path("/metadata/sensitivity")
-public class SensitivityMetadataResource {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
index e60e59e..3401b3c 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
@@ -56,8 +56,6 @@ public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
         String groupId = context.getString("consumerGroupId");
         // Kafka fetch size
         int fetchSize = context.getInt("fetchSize");
-        // Kafka deserializer class
-        String deserClsName = context.getString("deserializerClass");
         // Kafka broker zk connection
         String zkConnString = context.getString("zkConnection");
         // transaction zkRoot

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
new file mode 100644
index 0000000..662311c
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import org.apache.eagle.app.Configuration;
+
+/**
+ * Since 8/5/16.
+ */
+public class HBaseAuditLogAppConf extends Configuration{
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
new file mode 100644
index 0000000..051d8c4
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.apache.eagle.security.service.MetadataDaoFactory;
+
+/**
+ * Since 8/5/16.
+ */
+public class HBaseAuditLogAppProvider extends AbstractApplicationProvider<HBaseAuditLogApplication> {
+    public HBaseAuditLogAppProvider() {
+        super("/META-INF/metadata.xml");
+    }
+
+    @Override
+    public HBaseAuditLogApplication getApplication() {
+        return new HBaseAuditLogApplication();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index 49393e1..3d80308 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -16,27 +16,35 @@
  */
 package org.apache.eagle.security.hbase;
 
+import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
 import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
 import storm.kafka.StringScheme;
-import storm.kafka.bolt.KafkaBolt;
 
 /**
  * Since 7/27/16.
  */
-public class HBaseAuditLogApplication{
+public class HBaseAuditLogApplication extends StormApplication<HBaseAuditLogAppConf> {
     public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
     public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
     public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
     public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
-    protected void buildApp(TopologyBuilder builder) {
-        System.setProperty("config.resource", "/application.conf");
-        Config config = ConfigFactory.load();
+    @Override
+    public StormTopology execute(HBaseAuditLogAppConf config1, StormEnvironment environment) {
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
         NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
         IRichSpout spout = provider.getSpout(config);
 
@@ -55,8 +63,15 @@ public class HBaseAuditLogApplication{
         BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
         joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
 
-        KafkaBolt kafkaBolt = new KafkaBolt();
-        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
-        kafkaBoltDeclarer.shuffleGrouping("joinBolt");
+        StormStreamSink sinkBolt = environment.getFlattenStreamSink("hbase_audit_log_stream",config);
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+        kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user"));
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args){
+        Config config = ConfigFactory.load();
+        HBaseAuditLogApplication app = new HBaseAuditLogApplication();
+        app.run(config);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
deleted file mode 100644
index 13ca214..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.security.hbase;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
-import org.apache.eagle.security.topo.TopologySubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-import storm.kafka.bolt.KafkaBolt;
-
-public class HbaseAuditLogMonitoringMain {
-    private static Logger LOG = LoggerFactory.getLogger(HbaseAuditLogMonitoringMain.class);
-    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
-    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
-    public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
-    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
-
-    public static void main(String[] args) throws Exception{
-        System.setProperty("config.resource", "/application.conf");
-        Config config = ConfigFactory.load();
-        NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
-        IRichSpout spout = provider.getSpout(config);
-
-        HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();
-        TopologyBuilder builder = new TopologyBuilder();
-
-        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
-        int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
-        int numOfJoinTasks = config.getInt(JOIN_TASK_NUM);
-        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
-
-        builder.setSpout("ingest", spout, numOfSpoutTasks);
-        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
-        boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
-
-        HbaseResourceSensitivityDataJoinBolt joinBolt = new HbaseResourceSensitivityDataJoinBolt(config);
-        BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
-        joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
-
-        KafkaBolt kafkaBolt = new KafkaBolt();
-        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
-        kafkaBoltDeclarer.shuffleGrouping("joinBolt");
-
-        StormTopology topology = builder.createTopology();
-
-        TopologySubmitter.submit(topology, config);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
index cf486d3..d8d9d6b 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
@@ -23,14 +23,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
 
 import java.util.Arrays;
 import java.util.Map;
@@ -94,9 +92,7 @@ public class HbaseResourceSensitivityDataJoinBolt extends BaseRichBolt {
             }
             LOG.info("After hbase resource sensitivity lookup: " + newEvent);
             // push to Kafka sink
-            ObjectMapper mapper = new ObjectMapper();
-            String msg = mapper.writeValueAsString(map);
-            collector.emit(Arrays.asList(newEvent.get("user"), msg));
+            collector.emit(Arrays.asList(newEvent.get("user"), newEvent));
         }catch(Exception ex){
             LOG.error("error joining data, ignore it", ex);
         }finally {
@@ -106,6 +102,6 @@ public class HbaseResourceSensitivityDataJoinBolt extends BaseRichBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
+        declarer.declare(new Fields("user", "message"));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
index 9ca0701..603ed5a 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
@@ -19,10 +19,8 @@ package org.apache.eagle.security.hbase;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Maps;
-import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
-import org.apache.eagle.security.service.HBaseSensitivityEntity;
-import org.apache.eagle.security.service.ISecurityMetadataDAO;
-import org.apache.eagle.security.service.MetadataDaoFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.security.service.*;
 import org.apache.eagle.security.util.AbstractResourceSensitivityPollingJob;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.quartz.Job;
@@ -33,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 
 public class HbaseResourceSensitivityPollingJob extends AbstractResourceSensitivityPollingJob implements Job {
@@ -44,12 +41,33 @@ public class HbaseResourceSensitivityPollingJob extends AbstractResourceSensitiv
             throws JobExecutionException {
         JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
         try {
-            ISecurityMetadataDAO dao = MetadataDaoFactory.getInstance().getMetadataDao();
-            Collection<HBaseSensitivityEntity> sensitivityEntities = dao.listHBaseSensitivies();
-            ExternalDataCache.getInstance().setJobResult(getClass(), sensitivityEntities);
+            Collection<HBaseSensitivityEntity> sensitivityEntities = load(jobDataMap);
+            Map<String, HBaseSensitivityEntity> map = Maps.uniqueIndex(
+                    sensitivityEntities,
+                    new Function<HBaseSensitivityEntity, String>() {
+                        @Override
+                        public String apply(HBaseSensitivityEntity input) {
+                            return input.getHbaseResource();
+                        }
+                    });
+            ExternalDataCache.getInstance().setJobResult(getClass(), map);
         } catch(Exception ex) {
         	LOG.error("Fail to load hbase resource sensitivity data", ex);
         }
     }
 
-}
\ No newline at end of file
+    private Collection<HBaseSensitivityEntity> load(JobDataMap jobDataMap) throws Exception {
+        Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
+        String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
+        Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
+        String username = map.containsKey(EagleConfigConstants.USERNAME) ? (String)map.get(EagleConfigConstants.USERNAME) : null;
+        String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
+
+        // load from eagle database
+        LOG.info("Load hbase resource sensitivity information from eagle service "
+                + eagleServiceHost + ":" + eagleServicePort);
+
+        IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+        return client.listHBaseSensitivies();
+    }
+}


Mime
View raw message