eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject incubator-eagle git commit: [EAGLE-458] Migrate eagle-jpm-spark-running using appplication framework
Date Fri, 19 Aug 2016 04:17:10 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 9432fcf91 -> acee5cb33


[EAGLE-458] Migrate eagle-jpm-spark-running using appplication framework

https://issues.apache.org/jira/browse/EAGLE-458

Author: Hao Chen <hao@apache.org>

Closes #335 from haoch/EAGLE-458.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/acee5cb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/acee5cb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/acee5cb3

Branch: refs/heads/develop
Commit: acee5cb334a266ed1f7cb215d0f8252fd5b4e067
Parents: 9432fcf
Author: Hao Chen <hao@apache.org>
Authored: Fri Aug 19 12:16:57 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Fri Aug 19 12:16:57 2016 +0800

----------------------------------------------------------------------
 .../eagle/app/service/ApplicationContext.java   |  20 +-
 .../apache/eagle/app/test/AppJUnitRunner.java   |   2 +-
 .../eagle/app/test/AppTestGuiceModule.java      |  42 ----
 .../eagle/app/test/ApplicationSimulator.java    |  70 +++++++
 .../app/test/ApplicationSimulatorImpl.java      |  92 +++++++++
 .../eagle/app/test/ApplicationTestBase.java     |  43 ++++
 .../app/test/ApplicationTestGuiceModule.java    |  42 ++++
 .../apache/eagle/app/test/ServerSimulator.java  |  70 -------
 .../eagle/app/test/ServerSimulatorImpl.java     |  92 ---------
 .../example/ExampleApplicationProviderTest.java |   4 +-
 eagle-jpm/eagle-jpm-spark-running/pom.xml       |   5 +
 .../jpm/spark/running/SparkRunningJobApp.java   |  67 +++++++
 .../spark/running/SparkRunningJobAppConfig.java | 175 +++++++++++++++++
 .../running/SparkRunningJobAppProvider.java     |  26 +++
 .../jpm/spark/running/SparkRunningJobMain.java  |  61 +-----
 .../common/SparkRunningConfigManager.java       | 151 --------------
 .../parser/SparkAppEntityCreationHandler.java   |   6 +-
 .../running/parser/SparkApplicationParser.java  |  10 +-
 .../running/recover/SparkRunningJobManager.java |   4 +-
 .../storm/SparkRunningJobFetchSpout.java        |  14 +-
 .../running/storm/SparkRunningJobParseBolt.java |  18 +-
 ...spark.running.SparkRunningJobAppProvider.xml | 195 +++++++++++++++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |  16 ++
 .../src/main/resources/application.conf         |   9 +-
 .../java/SparkRunningJobAppProviderTest.java    |  32 +++
 pom.xml                                         |   1 +
 26 files changed, 808 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
index 76ee289..91d33ca 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
@@ -73,15 +73,17 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
 
     @Override
     public void onInstall() {
-        List<StreamDesc> streamDescCollection = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
-            StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(streamDefinition.getStreamId(),this.config);
-            StreamDesc streamDesc = new StreamDesc();
-            streamDesc.setSchema(streamDefinition);
-            streamDesc.setSink(streamSinkConfig);
-            streamDesc.setStreamId(streamDefinition.getStreamId());
-            return streamDesc;
-        })).collect(Collectors.toList());
-        metadata.setStreams(streamDescCollection);
+        if(metadata.getDescriptor().getStreams()!=null) {
+            List<StreamDesc> streamDescCollection = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
+                StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(streamDefinition.getStreamId(), this.config);
+                StreamDesc streamDesc = new StreamDesc();
+                streamDesc.setSchema(streamDefinition);
+                streamDesc.setSink(streamSinkConfig);
+                streamDesc.setStreamId(streamDefinition.getStreamId());
+                return streamDesc;
+            })).collect(Collectors.toList());
+            metadata.setStreams(streamDescCollection);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java
index 572af2c..b8174bb 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java
@@ -52,7 +52,7 @@ public class AppJUnitRunner extends BlockJUnit4ClassRunner {
             throws InitializationError {
         final List<Module> modules = new ArrayList<>();
 
-        AppTestGuiceModule testGuiceModule = new AppTestGuiceModule();
+        ApplicationTestGuiceModule testGuiceModule = new ApplicationTestGuiceModule();
 
         // Add default modules
         modules.add(testGuiceModule);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
deleted file mode 100644
index 9b30ee4..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import org.apache.eagle.app.module.ApplicationExtensionLoader;
-import org.apache.eagle.app.module.ApplicationGuiceModule;
-import org.apache.eagle.common.module.CommonGuiceModule;
-import org.apache.eagle.common.module.GlobalScope;
-import org.apache.eagle.common.module.ModuleRegistry;
-import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
-
-public class AppTestGuiceModule extends AbstractModule{
-    @Override
-    protected void configure() {
-        CommonGuiceModule common = new CommonGuiceModule();
-        ApplicationGuiceModule app = new ApplicationGuiceModule();
-        MemoryMetadataStore store = new MemoryMetadataStore();
-        install(common);
-        install(app);
-        install(store);
-        ModuleRegistry registry =ApplicationExtensionLoader.load(common,app,store);
-        registry.getModules(store.getClass()).forEach(this::install);
-        registry.getModules(GlobalScope.class).forEach(this::install);
-        bind(ServerSimulator.class).to(ServerSimulatorImpl.class).in(Singleton.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
new file mode 100644
index 0000000..3e4aa21
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Guice;
+import com.google.inject.Module;
+import org.apache.eagle.app.spi.ApplicationProvider;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Application test simulator for developer to quickly run application without diving into application lifecycle
+ */
+public abstract class ApplicationSimulator {
+    /**
+     *
+     * @param appType
+     */
+    public abstract void start(String appType);
+
+    /**
+     *
+     * @param appType
+     * @param appConfig
+     */
+    public abstract void start(String appType, Map<String,Object> appConfig);
+
+    /**
+     *
+     * @param appProviderClass
+     */
+    public abstract void start(Class<? extends ApplicationProvider> appProviderClass);
+
+    /**
+     *
+     * @param appProviderClass
+     * @param appConfig
+     */
+    public abstract void start(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
+
+    public static ApplicationSimulator getInstance(){
+        return Guice.createInjector(new ApplicationTestGuiceModule()).getInstance(ApplicationSimulator.class);
+    }
+
+    /**
+     * @param modules additional modules
+     * @return ApplicationSimulator instance
+     */
+    public static ApplicationSimulator getInstance(Module ... modules){
+        List<Module> contextModules = Arrays.asList(modules);
+        contextModules.add(new ApplicationTestGuiceModule());
+        return Guice.createInjector(contextModules).getInstance(ApplicationSimulator.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
new file mode 100644
index 0000000..35dead2
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.config.ApplicationProviderConfig;
+import org.apache.eagle.app.resource.ApplicationResource;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.SiteEntity;
+import org.apache.eagle.metadata.resource.SiteResource;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ApplicationSimulatorImpl extends ApplicationSimulator {
+    private final Config config;
+    private final SiteResource siteResource;
+    private final ApplicationResource applicationResource;
+
+    @Inject
+    public ApplicationSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource){
+        this.config = config;
+        this.siteResource = siteResource;
+        this.applicationResource = applicationResource;
+    }
+
+    @Override
+    public void start(String appType) {
+        start(appType, new HashMap<>());
+    }
+
+    @Override
+    public void start(String appType, Map<String, Object> appConfig) {
+        SiteEntity siteEntity = getUniqueSite();
+        siteResource.createSite(siteEntity);
+        Assert.assertNotNull(siteEntity.getUuid());
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(appConfig);
+        // Install application
+        ApplicationEntity applicationEntity =
+                applicationResource.installApplication(installOperation).getData();
+        // Start application
+        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
+    }
+
+    private final static AtomicInteger incr = new AtomicInteger();
+
+    private SiteEntity getUniqueSite(){
+        // Create local site
+        SiteEntity siteEntity = new SiteEntity();
+        siteEntity.setSiteId("SIMULATED_SITE_"+incr.incrementAndGet());
+        siteEntity.setSiteName(siteEntity.getSiteId());
+        siteEntity.setDescription("Automatically generated unique simulation site "+siteEntity.getSiteId()+" (simulator: "+this+")");
+        return siteEntity;
+    }
+
+    @Override
+    public void start(Class<? extends ApplicationProvider> appProviderClass) {
+        start(appProviderClass, new HashMap<>());
+    }
+
+    @Override
+    public void start(Class<? extends ApplicationProvider> appProviderClass, Map<String, Object> appConfig) {
+        try {
+            ApplicationProvider applicationProvider = appProviderClass.newInstance();
+            applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass),appProviderClass),config);
+            start(applicationProvider.getApplicationDesc().getType(),appConfig);
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException(e.getMessage(),e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
new file mode 100644
index 0000000..1c7d6be
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.eagle.common.module.CommonGuiceModule;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
+import org.junit.After;
+import org.junit.Before;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+
+public class ApplicationTestBase {
+    private Injector injector;
+
+    @Before
+    public void setUp(){
+        injector = Guice.createInjector(new ApplicationTestGuiceModule());
+        injector.injectMembers(this);
+    }
+
+    public Injector injector(){
+        return injector;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java
new file mode 100644
index 0000000..4f7b2b4
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import org.apache.eagle.app.module.ApplicationExtensionLoader;
+import org.apache.eagle.app.module.ApplicationGuiceModule;
+import org.apache.eagle.common.module.CommonGuiceModule;
+import org.apache.eagle.common.module.GlobalScope;
+import org.apache.eagle.common.module.ModuleRegistry;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
+
+public class ApplicationTestGuiceModule extends AbstractModule{
+    @Override
+    protected void configure() {
+        CommonGuiceModule common = new CommonGuiceModule();
+        ApplicationGuiceModule app = new ApplicationGuiceModule();
+        MemoryMetadataStore store = new MemoryMetadataStore();
+        install(common);
+        install(app);
+        install(store);
+        ModuleRegistry registry =ApplicationExtensionLoader.load(common,app,store);
+        registry.getModules(store.getClass()).forEach(this::install);
+        registry.getModules(GlobalScope.class).forEach(this::install);
+        bind(ApplicationSimulator.class).to(ApplicationSimulatorImpl.class).in(Singleton.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
deleted file mode 100644
index a91af77..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import com.google.inject.Guice;
-import com.google.inject.Module;
-import org.apache.eagle.app.spi.ApplicationProvider;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Application test simulator for developer to quickly run application without diving into application lifecycle
- */
-public abstract class ServerSimulator {
-    /**
-     *
-     * @param appType
-     */
-    public abstract void start(String appType);
-
-    /**
-     *
-     * @param appType
-     * @param appConfig
-     */
-    public abstract void start(String appType, Map<String,Object> appConfig);
-
-    /**
-     *
-     * @param appProviderClass
-     */
-    public abstract void start(Class<? extends ApplicationProvider> appProviderClass);
-
-    /**
-     *
-     * @param appProviderClass
-     * @param appConfig
-     */
-    public abstract void start(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
-
-    public static ServerSimulator getInstance(){
-        return Guice.createInjector(new AppTestGuiceModule()).getInstance(ServerSimulator.class);
-    }
-
-    /**
-     * @param modules additional modules
-     * @return ServerSimulator instance
-     */
-    public static ServerSimulator getInstance(Module ... modules){
-        List<Module> contextModules = Arrays.asList(modules);
-        contextModules.add(new AppTestGuiceModule());
-        return Guice.createInjector(contextModules).getInstance(ServerSimulator.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
deleted file mode 100644
index 1ef91ff..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
-import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.SiteEntity;
-import org.apache.eagle.metadata.resource.SiteResource;
-import org.junit.Assert;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ServerSimulatorImpl extends ServerSimulator {
-    private final Config config;
-    private final SiteResource siteResource;
-    private final ApplicationResource applicationResource;
-
-    @Inject
-    public ServerSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource){
-        this.config = config;
-        this.siteResource = siteResource;
-        this.applicationResource = applicationResource;
-    }
-
-    @Override
-    public void start(String appType) {
-        start(appType, new HashMap<>());
-    }
-
-    @Override
-    public void start(String appType, Map<String, Object> appConfig) {
-        SiteEntity siteEntity = getUniqueSite();
-        siteResource.createSite(siteEntity);
-        Assert.assertNotNull(siteEntity.getUuid());
-        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL);
-        installOperation.setConfiguration(appConfig);
-        // Install application
-        ApplicationEntity applicationEntity =
-                applicationResource.installApplication(installOperation).getData();
-        // Start application
-        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
-    }
-
-    private final static AtomicInteger incr = new AtomicInteger();
-
-    private SiteEntity getUniqueSite(){
-        // Create local site
-        SiteEntity siteEntity = new SiteEntity();
-        siteEntity.setSiteId("SIMULATED_SITE_"+incr.incrementAndGet());
-        siteEntity.setSiteName(siteEntity.getSiteId());
-        siteEntity.setDescription("Automatically generated unique simulation site "+siteEntity.getSiteId()+" (simulator: "+this+")");
-        return siteEntity;
-    }
-
-    @Override
-    public void start(Class<? extends ApplicationProvider> appProviderClass) {
-        start(appProviderClass, new HashMap<>());
-    }
-
-    @Override
-    public void start(Class<? extends ApplicationProvider> appProviderClass, Map<String, Object> appConfig) {
-        try {
-            ApplicationProvider applicationProvider = appProviderClass.newInstance();
-            applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass),appProviderClass),config);
-            start(applicationProvider.getApplicationDesc().getType(),appConfig);
-        } catch (InstantiationException | IllegalAccessException e) {
-            throw new IllegalStateException(e.getMessage(),e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
index e07f487..1c801bd 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
@@ -21,7 +21,7 @@ import org.apache.eagle.app.example.extensions.ExampleEntity;
 import org.apache.eagle.app.example.extensions.ExampleResource;
 import org.apache.eagle.app.resource.ApplicationResource;
 import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.test.ServerSimulator;
+import org.apache.eagle.app.test.ApplicationSimulator;
 import org.apache.eagle.app.test.AppJUnitRunner;
 import org.apache.eagle.common.module.GlobalScope;
 import org.apache.eagle.metadata.model.ApplicationDesc;
@@ -41,7 +41,7 @@ import java.util.Map;
 public class ExampleApplicationProviderTest {
     @Inject private SiteResource siteResource;
     @Inject private ApplicationResource applicationResource;
-    @Inject private ServerSimulator simulator;
+    @Inject private ApplicationSimulator simulator;
     @Inject private ExampleResource exampleResource;
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml
index cc53e7c..34d8545 100644
--- a/eagle-jpm/eagle-jpm-spark-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -132,6 +132,11 @@
           <artifactId>hadoop-mapreduce-client-core</artifactId>
           <version>${hadoop.version}</version>
       </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-app-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
   </dependencies>
   <build>
       <resources>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
new file mode 100644
index 0000000..61c0751
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.spark.running;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout;
+import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt;
+
+public class SparkRunningJobApp extends StormApplication {
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        //1. trigger init conf
+        SparkRunningJobAppConfig sparkRunningJobAppConfig = SparkRunningJobAppConfig.getInstance(config);
+
+        //2. init topology
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        final String spoutName = SparkRunningJobAppConfig.JOB_FETCH_SPOUT_NAME;
+        final String boltName = SparkRunningJobAppConfig.JOB_PARSE_BOLT_NAME;
+        int parallelism = sparkRunningJobAppConfig.getTopologyConfig().jobFetchSpoutParallism;
+        int tasks = sparkRunningJobAppConfig.getTopologyConfig().jobFetchSpoutTasksNum;
+        if (parallelism > tasks) {
+            parallelism = tasks;
+        }
+        topologyBuilder.setSpout(
+                spoutName,
+                new SparkRunningJobFetchSpout(
+                        sparkRunningJobAppConfig.getJobExtractorConfig(),
+                        sparkRunningJobAppConfig.getEndpointConfig(),
+                        sparkRunningJobAppConfig.getZkStateConfig()),
+                parallelism
+        ).setNumTasks(tasks);
+
+        parallelism = sparkRunningJobAppConfig.getTopologyConfig().jobParseBoltParallism;
+        tasks = sparkRunningJobAppConfig.getTopologyConfig().jobParseBoltTasksNum;
+        if (parallelism > tasks) {
+            parallelism = tasks;
+        }
+        topologyBuilder.setBolt(boltName,
+                new SparkRunningJobParseBolt(
+                        sparkRunningJobAppConfig.getZkStateConfig(),
+                        sparkRunningJobAppConfig.getEagleServiceConfig(),
+                        sparkRunningJobAppConfig.getEndpointConfig(),
+                        sparkRunningJobAppConfig.getJobExtractorConfig()),
+                parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+
+        return topologyBuilder.createTopology();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
new file mode 100644
index 0000000..668bc02
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class SparkRunningJobAppConfig implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobAppConfig.class);
+    static final String JOB_FETCH_SPOUT_NAME = "sparkRunningJobFetchSpout";
+    static final String JOB_PARSE_BOLT_NAME = "sparkRunningJobParseBolt";
+
+    public String getEnv() {
+        return env;
+    }
+    private String env;
+
+    ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+    private ZKStateConfig zkStateConfig;
+    private TopologyConfig topologyConfig;
+    public TopologyConfig getTopologyConfig(){
+        return topologyConfig;
+    }
+
+    public EagleServiceConfig getEagleServiceConfig() {
+        return eagleServiceConfig;
+    }
+    private EagleServiceConfig eagleServiceConfig;
+
+    public JobExtractorConfig getJobExtractorConfig() {
+        return jobExtractorConfig;
+    }
+    private JobExtractorConfig jobExtractorConfig;
+
+    public EndpointConfig getEndpointConfig() {
+        return endpointConfig;
+    }
+    private EndpointConfig endpointConfig;
+
+    public static class TopologyConfig implements Serializable {
+        public int jobFetchSpoutParallism;
+        public int jobFetchSpoutTasksNum;
+        public int jobParseBoltParallism;
+        public int jobParseBoltTasksNum;
+    }
+
+    public static class ZKStateConfig implements Serializable {
+        public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+        public String zkPort;
+        public boolean recoverEnabled;
+    }
+
+    public static class EagleServiceConfig implements Serializable {
+        public String eagleServiceHost;
+        public int eagleServicePort;
+        public int readTimeoutSeconds;
+        public int maxFlushNum;
+        public String username;
+        public String password;
+    }
+
+    public static class JobExtractorConfig implements Serializable {
+        public String site;
+        public int fetchRunningJobInterval;
+        public int parseThreadPoolSize;
+    }
+
+    public static class EndpointConfig implements Serializable {
+        public String nnEndpoint;
+        public String eventLog;
+        public String[] rmUrls;
+        public String principal;
+        public String keyTab;
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+    private Config config;
+
+    private static SparkRunningJobAppConfig manager = new SparkRunningJobAppConfig();
+
+    private SparkRunningJobAppConfig() {
+        this.eagleServiceConfig = new EagleServiceConfig();
+        this.jobExtractorConfig = new JobExtractorConfig();
+        this.endpointConfig = new EndpointConfig();
+        this.zkStateConfig = new ZKStateConfig();
+        this.topologyConfig = new TopologyConfig();
+    }
+
+    public static SparkRunningJobAppConfig getInstance(String[] args) {
+        try {
+            LOG.info("Loading from configuration file");
+            manager.init(new ConfigOptionParser().load(args));
+        } catch (Exception e) {
+            LOG.error("failed to load config");
+        }
+        return manager;
+    }
+
+    public static SparkRunningJobAppConfig getInstance(Config config) {
+        manager.init(config);
+        return manager;
+    }
+
+    private void init(Config config){
+        this.config = config;
+        this.env = config.getString("envContextConfig.env");
+        this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
+        this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
+        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
+        this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+        this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled");
+
+
+        // parse eagle service endpoint
+        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+        String port = config.getString("eagleProps.eagleService.port");
+        this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
+        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
+        this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+
+        //parse job extractor
+        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+        this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
+        this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize");
+
+        //parse data source config
+        this.endpointConfig.eventLog = config.getString("dataSourceConfig.eventLog");
+        this.endpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+        this.endpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
+        this.endpointConfig.principal = config.getString("dataSourceConfig.principal");
+
+        this.endpointConfig.rmUrls = config.getString("dataSourceConfig.rmUrls").split(",");
+
+        this.topologyConfig.jobFetchSpoutParallism = config.getInt("envContextConfig.parallelismConfig." + JOB_FETCH_SPOUT_NAME);
+        this.topologyConfig.jobFetchSpoutTasksNum = config.getInt("envContextConfig.tasks." + JOB_FETCH_SPOUT_NAME);
+        this.topologyConfig.jobParseBoltParallism = config.getInt("envContextConfig.parallelismConfig." + JOB_PARSE_BOLT_NAME);
+        this.topologyConfig.jobParseBoltTasksNum = config.getInt("envContextConfig.tasks." + JOB_PARSE_BOLT_NAME);
+
+        LOG.info("Successfully initialized SparkRunningJobAppConfig");
+        LOG.info("env: " + this.env);
+        LOG.info("site: " + this.jobExtractorConfig.site);
+        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
new file mode 100644
index 0000000..3d20af7
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.spark.running;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class SparkRunningJobAppProvider extends AbstractApplicationProvider<SparkRunningJobApp> {
+    @Override
+    public SparkRunningJobApp getApplication() {
+        return new SparkRunningJobApp();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
index 749f4d1..fe4a68c 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
@@ -18,67 +18,8 @@
 
 package org.apache.eagle.jpm.spark.running;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
-import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout;
-import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt;
-
 public class SparkRunningJobMain {
     public static void main(String[] args) {
-        try {
-            //1. trigger init conf
-            SparkRunningConfigManager sparkRunningConfigManager = SparkRunningConfigManager.getInstance(args);
-
-            //2. init topology
-            TopologyBuilder topologyBuilder = new TopologyBuilder();
-            String topologyName = sparkRunningConfigManager.getConfig().getString("envContextConfig.topologyName");
-            String spoutName = "sparkRunningJobFetchSpout";
-            String boltName = "sparkRunningJobParseBolt";
-            int parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
-            int tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
-            if (parallelism > tasks) {
-                parallelism = tasks;
-            }
-            topologyBuilder.setSpout(
-                    spoutName,
-                    new SparkRunningJobFetchSpout(
-                            sparkRunningConfigManager.getJobExtractorConfig(),
-                            sparkRunningConfigManager.getEndpointConfig(),
-                            sparkRunningConfigManager.getZkStateConfig()),
-                    parallelism
-            ).setNumTasks(tasks);
-
-            parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
-            tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName);
-            if (parallelism > tasks) {
-                parallelism = tasks;
-            }
-            topologyBuilder.setBolt(boltName,
-                    new SparkRunningJobParseBolt(
-                            sparkRunningConfigManager.getZkStateConfig(),
-                            sparkRunningConfigManager.getEagleServiceConfig(),
-                            sparkRunningConfigManager.getEndpointConfig(),
-                            sparkRunningConfigManager.getJobExtractorConfig()),
-                    parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
-
-            backtype.storm.Config config = new backtype.storm.Config();
-            config.setNumWorkers(sparkRunningConfigManager.getConfig().getInt("envContextConfig.workers"));
-            config.put(Config.TOPOLOGY_DEBUG, true);
-            if (!sparkRunningConfigManager.getEnv().equals("local")) {
-                //cluster mode
-                //parse conf here
-                StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
-            } else {
-                //local mode
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        new SparkRunningJobApp().run(args);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
deleted file mode 100644
index b05d12e..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.spark.running.common;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-
-public class SparkRunningConfigManager implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(SparkRunningConfigManager.class);
-
-    public String getEnv() {
-        return env;
-    }
-    private String env;
-
-    public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
-    private ZKStateConfig zkStateConfig;
-
-    public EagleServiceConfig getEagleServiceConfig() {
-        return eagleServiceConfig;
-    }
-    private EagleServiceConfig eagleServiceConfig;
-
-    public JobExtractorConfig getJobExtractorConfig() {
-        return jobExtractorConfig;
-    }
-    private JobExtractorConfig jobExtractorConfig;
-
-    public EndpointConfig getEndpointConfig() {
-        return endpointConfig;
-    }
-    private EndpointConfig endpointConfig;
-
-    public static class ZKStateConfig implements Serializable {
-        public String zkQuorum;
-        public String zkRoot;
-        public int zkSessionTimeoutMs;
-        public int zkRetryTimes;
-        public int zkRetryInterval;
-        public String zkPort;
-        public boolean recoverEnabled;
-    }
-
-    public static class EagleServiceConfig implements Serializable {
-        public String eagleServiceHost;
-        public int eagleServicePort;
-        public int readTimeoutSeconds;
-        public int maxFlushNum;
-        public String username;
-        public String password;
-    }
-
-    public static class JobExtractorConfig implements Serializable {
-        public String site;
-        public int fetchRunningJobInterval;
-        public int parseThreadPoolSize;
-    }
-
-    public static class EndpointConfig implements Serializable {
-        public String nnEndpoint;
-        public String eventLog;
-        public String[] rmUrls;
-        public String principal;
-        public String keyTab;
-    }
-
-    public Config getConfig() {
-        return config;
-    }
-    private Config config;
-
-    private static SparkRunningConfigManager manager = new SparkRunningConfigManager();
-
-    private SparkRunningConfigManager() {
-        this.eagleServiceConfig = new EagleServiceConfig();
-        this.jobExtractorConfig = new JobExtractorConfig();
-        this.endpointConfig = new EndpointConfig();
-        this.zkStateConfig = new ZKStateConfig();
-    }
-
-    public static SparkRunningConfigManager getInstance(String[] args) {
-        manager.init(args);
-        return manager;
-    }
-
-    private void init(String[] args) {
-        try {
-            LOG.info("Loading from configuration file");
-            this.config = new ConfigOptionParser().load(args);
-        } catch (Exception e) {
-            LOG.error("failed to load config");
-        }
-
-        this.env = config.getString("envContextConfig.env");
-
-        this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
-        this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
-        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
-        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
-        this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
-        this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled");
-
-        // parse eagle service endpoint
-        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
-        String port = config.getString("eagleProps.eagleService.port");
-        this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
-        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
-        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
-        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
-        this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
-
-        //parse job extractor
-        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
-        this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
-        this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize");
-
-        //parse data source config
-        this.endpointConfig.eventLog = config.getString("dataSourceConfig.eventLog");
-        this.endpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
-        this.endpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
-        this.endpointConfig.principal = config.getString("dataSourceConfig.principal");
-        this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]);
-
-        LOG.info("Successfully initialized SparkRunningConfigManager");
-        LOG.info("env: " + this.env);
-        LOG.info("site: " + this.jobExtractorConfig.site);
-        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
-        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
index 5491a80..92adfa8 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.spark.running.parser;
 
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -32,9 +32,9 @@ public class SparkAppEntityCreationHandler {
     private static final Logger LOG = LoggerFactory.getLogger(SparkAppEntityCreationHandler.class);
 
     private List<TaggedLogAPIEntity> entities = new ArrayList<>();
-    private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+    private SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig;
 
-    public SparkAppEntityCreationHandler(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig) {
+    public SparkAppEntityCreationHandler(SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig) {
         this.eagleServiceConfig = eagleServiceConfig;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
index bb76213..b2a5b63 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -20,7 +20,7 @@ package org.apache.eagle.jpm.spark.running.parser;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.spark.crawl.EventType;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.*;
 import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
 import org.apache.eagle.jpm.util.Constants;
@@ -64,7 +64,7 @@ public class SparkApplicationParser implements Runnable {
     private Map<Integer, Pair<Integer, Pair<Long, Long>>> stagesTime;
     private Set<Integer> completeStages;
     private Configuration hdfsConf;
-    private SparkRunningConfigManager.EndpointConfig endpointConfig;
+    private SparkRunningJobAppConfig.EndpointConfig endpointConfig;
     private final Object lock = new Object();
     private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
     private Map<String, String> commonTags = new HashMap<>();
@@ -78,9 +78,9 @@ public class SparkApplicationParser implements Runnable {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
     }
 
-    public SparkApplicationParser(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig,
-                                  SparkRunningConfigManager.EndpointConfig endpointConfig,
-                                  SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+    public SparkApplicationParser(SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig,
+                                  SparkRunningJobAppConfig.EndpointConfig endpointConfig,
+                                  SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig,
                                   AppInfo app, Map<String, SparkAppEntity> sparkApp,
                                   SparkRunningJobManager sparkRunningJobManager, ResourceFetcher rmResourceFetcher) {
         this.sparkAppEntityCreationHandler = new SparkAppEntityCreationHandler(eagleServiceConfig);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
index 2b6c62f..11f7909 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
@@ -19,7 +19,7 @@
 package org.apache.eagle.jpm.spark.running.recover;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
 import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
 import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
@@ -30,7 +30,7 @@ import java.util.*;
 public class SparkRunningJobManager implements Serializable {
     private RunningJobManager runningJobManager;
 
-    public SparkRunningJobManager(SparkRunningConfigManager.ZKStateConfig config) {
+    public SparkRunningJobManager(SparkRunningJobAppConfig.ZKStateConfig config) {
         this.runningJobManager = new RunningJobManager(config.zkQuorum,
                 config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
index 6be0cfd..256829e 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -24,7 +24,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
 import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
 import org.apache.eagle.jpm.util.Constants;
@@ -40,18 +40,18 @@ import java.util.*;
 public class SparkRunningJobFetchSpout extends BaseRichSpout {
     private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobFetchSpout.class);
 
-    private SparkRunningConfigManager.ZKStateConfig zkStateConfig;
-    private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig;
-    private SparkRunningConfigManager.EndpointConfig endpointConfig;
+    private SparkRunningJobAppConfig.ZKStateConfig zkStateConfig;
+    private SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig;
+    private SparkRunningJobAppConfig.EndpointConfig endpointConfig;
     private ResourceFetcher resourceFetcher;
     private SpoutOutputCollector collector;
     private boolean init;
     private transient SparkRunningJobManager sparkRunningJobManager;
     private Set<String> runningYarnApps;
 
-    public SparkRunningJobFetchSpout(SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig,
-                                     SparkRunningConfigManager.EndpointConfig endpointConfig,
-                                     SparkRunningConfigManager.ZKStateConfig zkStateConfig) {
+    public SparkRunningJobFetchSpout(SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig,
+                                     SparkRunningJobAppConfig.EndpointConfig endpointConfig,
+                                     SparkRunningJobAppConfig.ZKStateConfig zkStateConfig) {
         this.jobExtractorConfig = jobExtractorConfig;
         this.endpointConfig = endpointConfig;
         this.zkStateConfig = zkStateConfig;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index 6928240..d207ffc 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -23,7 +23,7 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
 import org.apache.eagle.jpm.spark.running.parser.SparkApplicationParser;
 import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
@@ -44,17 +44,17 @@ import java.util.concurrent.ExecutorService;
 public class SparkRunningJobParseBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobParseBolt.class);
 
-    private SparkRunningConfigManager.ZKStateConfig zkStateConfig;
-    private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig;
-    private SparkRunningConfigManager.EndpointConfig endpointConfig;
-    private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+    private SparkRunningJobAppConfig.ZKStateConfig zkStateConfig;
+    private SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig;
+    private SparkRunningJobAppConfig.EndpointConfig endpointConfig;
+    private SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig;
     private ExecutorService executorService;
     private Map<String, SparkApplicationParser> runningSparkParsers;
     private ResourceFetcher resourceFetcher;
-    public SparkRunningJobParseBolt(SparkRunningConfigManager.ZKStateConfig zkStateConfig,
-                                    SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig,
-                                    SparkRunningConfigManager.EndpointConfig endpointConfig,
-                                    SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig) {
+    public SparkRunningJobParseBolt(SparkRunningJobAppConfig.ZKStateConfig zkStateConfig,
+                                    SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig,
+                                    SparkRunningJobAppConfig.EndpointConfig endpointConfig,
+                                    SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig) {
         this.zkStateConfig = zkStateConfig;
         this.eagleServiceConfig = eagleServiceConfig;
         this.endpointConfig = endpointConfig;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
new file mode 100644
index 0000000..24cf09e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>SPARK_RUNNING_JOB_APP</type>
+    <name>Spark Running Job Monitoring</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.jpm.spark.running.SparkRunningJobApp</appClass>
+    <viewPath>/apps/jpm</viewPath>
+    <configuration>
+        <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig -->
+        <property>
+            <name>envContextConfig.env</name>
+            <value>local</value>
+            <displayName>Environment</displayName>
+            <description>Execution environment</description>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkQuorum</name>
+            <displayName>zkQuorum</displayName>
+            <description>Zookeeper Quorum</description>
+            <value>sandbox.hortonworks.com:2181</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkPort</name>
+            <displayName>zkPort</displayName>
+            <description>Zookeeper Port</description>
+            <value>2181</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkSessionTimeoutMs</name>
+            <displayName>zkSessionTimeoutMs</displayName>
+            <description>Zookeeper session timeoutMs</description>
+            <value>15000</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkRetryTimes</name>
+            <displayName>zkRetryTimes</displayName>
+            <description>zookeeperConfig.zkRetryTimes</description>
+            <value>3</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkRetryInterval</name>
+            <displayName>zkRetryInterval</displayName>
+            <description>zookeeperConfig.zkRetryInterval</description>
+            <value>20000</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkRoot</name>
+            <value>/apps/spark/running</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.recoverEnabled</name>
+            <description>zookeeperConfig.recoverEnabled</description>
+            <value>false</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <description>eagleProps.eagleService.host</description>
+            <value>sandbox.hortonworks.com</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <description>eagleProps.eagleService.port</description>
+            <value>9099</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <description>eagleProps.eagleService.username</description>
+            <value>admin</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <description>eagleProps.eagleService.password</description>
+            <value>secret</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.readTimeOutSeconds</name>
+            <description>eagleProps.eagleService.readTimeOutSeconds</description>
+            <value>20</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.maxFlushNum</name>
+            <description>eagleProps.eagleService.maxFlushNum</description>
+            <value>500</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.site</name>
+            <description>jobExtractorConfig.site</description>
+            <value>sandbox</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.fetchRunningJobInterval</name>
+            <description>jobExtractorConfig.fetchRunningJobInterval</description>
+            <value>15</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.parseThreadPoolSize</name>
+            <description>jobExtractorConfig.parseThreadPoolSize</description>
+            <value>5</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.eventLog</name>
+            <description>dataSourceConfig.eventLog</description>
+            <value>/spark-history</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.nnEndpoint</name>
+            <description>dataSourceConfig.nnEndpoint</description>
+            <value>hdfs://sandbox.hortonworks.com:8020</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.keytab</name>
+            <description>dataSourceConfig.keytab</description>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.principal</name>
+            <description>dataSourceConfig.principal</description>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.rmUrls</name>
+            <description>dataSourceConfig.rmUrls</description>
+            <value>http://sandbox.hortonworks.com:8088</value>
+        </property>
+        <property>
+            <name>envContextConfig.parallelismConfig.sparkRunningJobFetchSpout</name>
+            <description>Parallelism of sparkRunningJobFetchSpout </description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>envContextConfig.tasks.sparkRunningJobFetchSpout</name>
+            <description>Tasks Num of sparkRunningJobFetchSpout </description>
+            <value>4</value>
+        </property>
+        <property>
+            <name>envContextConfig.parallelismConfig.sparkRunningJobParseBolt</name>
+            <description>Parallelism of sparkRunningJobParseBolt </description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>envContextConfig.tasks.sparkRunningJobParseBolt</name>
+            <description>Tasks Num of sparkRunningJobParseBolt</description>
+            <value>4</value>
+        </property>
+    </configuration>
+    <docs>
+        <install>
+            # Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+            ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+            # Step 2: Set up data collector to flow data into kafka topic in
+
+            ./bin/logstash -f log_collector.conf
+
+            ## `log_collector.conf` sample as following:
+
+            input {
+
+            }
+            filter {
+
+            }
+            output{
+
+            }
+
+            # Step 3: start application
+
+            # Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+            # Step 1: stop and uninstall application
+            # Step 2: delete kafka topic named "${site}_example_source_topic"
+            # Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..6aef879
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
index d93a135..9d9f622 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
@@ -15,8 +15,6 @@
 
 {
   "envContextConfig" : {
-    "env" : "local",
-    "topologyName" : "sparkRunningJob",
     "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "sparkRunningJobFetchSpout" : 1,
@@ -28,15 +26,13 @@
     },
     "workers" : 2
   },
-
   "jobExtractorConfig" : {
     "site" : "sandbox",
     "fetchRunningJobInterval" : 15,
     "parseThreadPoolSize" : 5
   },
-
   "dataSourceConfig" : {
-    "rmUrls": ["http://sandbox.hortonworks.com:8088"],
+    "rmUrls": "http://sandbox.hortonworks.com:8088",
     "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
     "principal" : "", #if not need, then empty
     "keytab" : "",
@@ -52,7 +48,8 @@
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000
   },
-
+  "appId":"sparkRunningJob",
+  "mode":"LOCAL",
   "eagleProps" : {
     "mailHost" : "abc.com",
     "mailDebug" : "true",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java b/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java
new file mode 100644
index 0000000..346171a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider;
+import org.junit.Test;
+
+public class SparkRunningJobAppProviderTest extends ApplicationTestBase {
+    @Inject
+    ApplicationSimulator simulator;
+
+    @Test
+    public void testRunWithProvider(){
+        simulator.start(SparkRunningJobAppProvider.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 04589f9..72d0ad7 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1190,6 +1190,7 @@
                                 <exclude>README*</exclude>
                                 <exclude>**/*.log</exclude>
                                 <exclude>**/*.out</exclude>
+                                <exclude>**/*.db</exclude>
                                 <exclude>**/eagle.log*</exclude>
                                 <exclude>**/velocity.log*</exclude>
                                 <!-- all json files should be excluded -->


Mime
View raw message