Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E56CB200BC8 for ; Wed, 19 Oct 2016 05:27:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E39C6160AF7; Wed, 19 Oct 2016 03:27:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 740B3160AE5 for ; Wed, 19 Oct 2016 05:27:39 +0200 (CEST) Received: (qmail 36772 invoked by uid 500); 19 Oct 2016 03:27:37 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 36689 invoked by uid 99); 19 Oct 2016 03:27:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 03:27:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 06D3E18068A for ; Wed, 19 Oct 2016 03:27:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id uTu4OZuCl4Lr for ; Wed, 19 Oct 2016 03:27:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 9EFD15FC32 for ; Wed, 19 Oct 2016 03:27:17 +0000 (UTC) Received: (qmail 33044 invoked by uid 99); 19 Oct 2016 03:27:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 03:27:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 251CBF12ED; Wed, 19 Oct 2016 03:27:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.incubator.apache.org Date: Wed, 19 Oct 2016 03:27:27 -0000 Message-Id: In-Reply-To: <679f7884b2ff4acc87cbf4e51a1de5db@git.apache.org> References: <679f7884b2ff4acc87cbf4e51a1de5db@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] incubator-eagle git commit: [EAGLE-556] Install/Update Alert Topology Metadata when start alert engine archived-at: Wed, 19 Oct 2016 03:27:42 -0000 [EAGLE-556] Install/Update Alert Topology Metadata when start alert engine Currently alert engine requires additional metadata like TopologyMeta, which in fact could be automatically generated during starting alert engine. *Changes* * Add "ApplicationListener" in ApplicationProvider to support extensible application lifecycle management callback listener * Implement AlertUnitTopologyAppListener to add topology metadata when topology is running and remove topology metadata when topology is stopped Author: Hao Chen Closes #493 from haoch/EAGLE-556. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fc2407cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fc2407cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fc2407cd Branch: refs/heads/master Commit: fc2407cd53438a31b4c1072ce7d5b08578b8e43d Parents: fa85dc3 Author: Hao Chen Authored: Wed Oct 12 12:10:06 2016 +0800 Committer: Hao Chen Committed: Wed Oct 12 12:10:06 2016 +0800 ---------------------------------------------------------------------- .../alert/app/AlertUnitTopologyAppListener.java | 90 ++++++++++ .../alert/app/AlertUnitTopologyAppProvider.java | 10 +- ...e.alert.app.AlertUnitTopologyAppProvider.xml | 4 +- .../alert/engine/runner/UnitTopologyRunner.java | 164 ++++++++++++------ .../apache/eagle/app/ApplicationLifecycle.java | 23 ++- .../environment/ExecutionRuntimeManager.java | 4 +- .../environment/impl/StormExecutionRuntime.java | 21 ++- .../eagle/app/service/ApplicationAction.java | 161 ++++++++++++++++++ .../eagle/app/service/ApplicationListener.java | 32 ++++ .../service/ApplicationOperationContext.java | 167 ------------------- .../impl/ApplicationManagementServiceImpl.java | 110 +++++++----- .../impl/ApplicationProviderConfigLoader.java | 1 - .../impl/ApplicationProviderSPILoader.java | 1 - .../apache/eagle/app/sink/KafkaStreamSink.java | 4 +- .../eagle/app/sink/LoggingStreamSink.java | 8 +- .../app/spi/AbstractApplicationProvider.java | 52 +++++- .../eagle/app/spi/ApplicationProvider.java | 19 +-- .../app/test/ApplicationSimulatorImpl.java | 1 - .../apache/eagle/app/TestStormApplication.java | 9 +- .../app/service/ApplicationActionTest.java | 48 ++++++ .../ApplicationOperationContextTest.java | 48 ------ .../eagle/metadata/model/ApplicationEntity.java | 6 + .../app/example/ExampleApplicationProvider.java | 55 ++++-- .../hbase/HBaseAuditLogAppProvider.java | 5 - ...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 8 +- 25 files changed, 672 insertions(+), 379 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppListener.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppListener.java new file mode 100644 index 0000000..fd3f9d2 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppListener.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.app; + +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.engine.runner.UnitTopologyRunner; +import org.apache.eagle.alert.metadata.IMetadataDao; +import org.apache.eagle.alert.metadata.resource.OpResult; +import org.apache.eagle.app.service.ApplicationListener; +import org.apache.eagle.metadata.model.ApplicationEntity; +import com.typesafe.config.ConfigFactory; + +import com.google.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AlertUnitTopologyAppListener implements ApplicationListener { + private static final Logger LOG = LoggerFactory.getLogger(AlertUnitTopologyAppListener.class); + + @Inject private IMetadataDao metadataDao; + + private ApplicationEntity applicationEntity; + + @Override + public void init(ApplicationEntity applicationEntity) { + this.applicationEntity = applicationEntity; + } + + @Override + public void afterInstall() { + // Do nothing + } + + @Override + public void afterUninstall() { + removeTopologyMetadata(); + } + + @Override + public void beforeStart() { + // Do thing, may do some validation works? + updateTopologyMetadata(); + } + + @Override + public void afterStop() { + removeTopologyMetadata(); + } + + // ------------- + // Internal RPC + // ------------- + + private void updateTopologyMetadata() { + LOG.info("Update topology metadata {}", this.applicationEntity.getAppId()); + OpResult result = metadataDao.addTopology(createTopologyMeta(this.applicationEntity)); + if (result.code == OpResult.FAILURE) { + LOG.error(result.message); + throw new IllegalStateException(result.message); + } + } + + private void removeTopologyMetadata() { + LOG.info("Remove topology metadata {}", this.applicationEntity.getAppId()); + OpResult result = metadataDao.removeTopology(createTopologyMeta(this.applicationEntity).getName()); + if (result.code == OpResult.FAILURE) { + LOG.error(result.message); + throw new IllegalStateException(result.message); + } + } + + private Topology createTopologyMeta(ApplicationEntity applicationEntity) { + return UnitTopologyRunner.buildTopologyMetadata(applicationEntity.getAppId(),ConfigFactory.parseMap(applicationEntity.getConfiguration())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java index 39a4583..5548c8c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProvider.java @@ -16,8 +16,11 @@ */ package org.apache.eagle.alert.app; +import org.apache.eagle.app.service.ApplicationListener; import org.apache.eagle.app.spi.AbstractApplicationProvider; +import java.util.Optional; + /** * since 8/25/16. */ @@ -26,4 +29,9 @@ public class AlertUnitTopologyAppProvider extends AbstractApplicationProvider getApplicationListener() { + return Optional.of(new AlertUnitTopologyAppListener()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml index 498cb8d..bf22123 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml @@ -65,7 +65,7 @@ spout.kafkaBrokerZkQuorum spout.kafkaBrokerZkQuorum - server.eagle.apache.org:2181 + localhost:2181 zookeeper quorum for spout to consume data @@ -97,7 +97,7 @@ zkConfig.zkQuorum zkConfig.zkQuorum - server.eagle.apache.org:2181 + localhost:2181 zk quorum for alert engine http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java index 7a93e72..ec129fe 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java @@ -19,6 +19,7 @@ package org.apache.eagle.alert.engine.runner; +import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService; import org.apache.eagle.alert.engine.spout.CorrelationSpout; @@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.TreeSet; /** * By default @@ -76,20 +79,78 @@ public class UnitTopologyRunner { this.givenStormConfig = stormConfig; } + // ----------------------------- + // Storm Topology Submit Helper + // ----------------------------- + + private void run(String topologyId, + int numOfTotalWorkers, + int numOfSpoutTasks, + int numOfRouterBolts, + int numOfAlertBolts, + int numOfPublishTasks, + Config config, + boolean localMode) { + + backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig; + // TODO: Configurable metric consumer instance number + + int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS; + LOG.info("Set topology.message.timeout.secs as {}", messageTimeoutSecs); + stormConfig.setMessageTimeoutSecs(messageTimeoutSecs); + + if (config.hasPath("metric")) { + stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1); + } + + stormConfig.setNumWorkers(numOfTotalWorkers); + StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); + + if (localMode) { + LOG.info("Submitting as local mode"); + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology(topologyId, stormConfig, topology); + Utils.sleep(Long.MAX_VALUE); + } else { + LOG.info("Submitting as cluster mode"); + try { + StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology); + } catch (Exception ex) { + LOG.error("fail submitting topology {}", topology, ex); + throw new IllegalStateException(ex); + } + } + } + + public void run(String topologyId, Config config) { + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); + int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); + boolean localMode = config.getBoolean(LOCAL_MODE); + int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM); + run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config, localMode); + } + + public IMetadataChangeNotifyService getMetadataChangeNotifyService() { + return metadataChangeNotifyService; + } + + // --------------------------- + // Build Storm Topology + // --------------------------- + public StormTopology buildTopology(String topologyId, int numOfSpoutTasks, int numOfRouterBolts, int numOfAlertBolts, int numOfPublishTasks, Config config) { - StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts]; AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts]; - TopologyBuilder builder = new TopologyBuilder(); - // construct Spout object CorrelationSpout spout = new CorrelationSpout(config, topologyId, getMetadataChangeNotifyService(), numOfRouterBolts, spoutName, streamRouterBoltNamePrefix); builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); @@ -147,63 +208,66 @@ public class UnitTopologyRunner { } public StormTopology buildTopology(String topologyId, Config config) { - int numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks"); - int numOfRouterBolts = config.getInt("topology.numOfRouterBolts"); - int numOfAlertBolts = config.getInt("topology.numOfAlertBolts"); - int numOfPublishTasks = config.getInt("topology.numOfPublishTasks"); + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); + int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); + return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); } - private void run(String topologyId, - int numOfTotalWorkers, - int numOfSpoutTasks, - int numOfRouterBolts, - int numOfAlertBolts, - int numOfPublishTasks, - Config config, - boolean localMode) { + // --------------------------- + // Build Topology Metadata + // --------------------------- - backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig; - // TODO: Configurable metric consumer instance number + public static Topology buildTopologyMetadata(String topologyId, Config config) { + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); + int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS; - LOG.info("Set topology.message.timeout.secs as {}", messageTimeoutSecs); - stormConfig.setMessageTimeoutSecs(messageTimeoutSecs); + return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); + } - if (config.hasPath("metric")) { - stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1); + public static Topology buildTopologyMetadata(String topologyId, + int numOfSpoutTasks, + int numOfRouterBolts, + int numOfAlertBolts, + int numOfPublishTasks, + Config config) { + Topology topology = new Topology(); + topology.setName(topologyId); + topology.setNumOfSpout(numOfSpoutTasks); + topology.setNumOfAlertBolt(numOfAlertBolts); + topology.setNumOfGroupBolt(numOfRouterBolts); + topology.setNumOfPublishBolt(numOfPublishTasks); + + // Set Spout ID + topology.setSpoutId(spoutName); + + // Set Router (Group) ID + Set streamRouterBoltNames = new TreeSet<>(); + for (int i = 0; i < numOfRouterBolts; i++) { + streamRouterBoltNames.add(streamRouterBoltNamePrefix + i); } + topology.setGroupNodeIds(streamRouterBoltNames); - stormConfig.setNumWorkers(numOfTotalWorkers); - StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); - - if (localMode) { - LOG.info("Submitting as local mode"); - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(topologyId, stormConfig, topology); - Utils.sleep(Long.MAX_VALUE); - } else { - LOG.info("Submitting as cluster mode"); - try { - StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology); - } catch (Exception ex) { - LOG.error("fail submitting topology {}", topology, ex); - throw new IllegalStateException(ex); - } + // Set Alert Bolt ID + Set alertBoltIds = new TreeSet<>(); + for (int i = 0; i < numOfAlertBolts; i++) { + alertBoltIds.add(alertBoltNamePrefix + i); } - } + topology.setAlertBoltIds(alertBoltIds); - public void run(String topologyId, Config config) { - int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); - int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); - int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); - int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - boolean localMode = config.getBoolean(LOCAL_MODE); - int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM); - run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config, localMode); - } + // Set Publisher ID + topology.setPubBoltId(alertPublishBoltName); - public IMetadataChangeNotifyService getMetadataChangeNotifyService() { - return metadataChangeNotifyService; + // TODO: Load bolts' parallelism from configuration, currently keep 1 by default. + + topology.setSpoutParallelism(1); + topology.setGroupParallelism(1); + topology.setAlertParallelism(1); + + return topology; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java index 94b6195..e482615 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycle.java @@ -16,28 +16,35 @@ */ package org.apache.eagle.app; +/** + * Application Lifecycle Listener/Callback. + */ public interface ApplicationLifecycle { /** - * on application installed. + * After Application Installed. (Callback) */ - default void onInstall() { + default void afterInstall() { + // Do nothing by default } /** - * on application uninstalled. + * After Application Uninstalled. (Callback) */ - default void onUninstall() { + default void afterUninstall() { + // Do nothing by default } /** - * onStart. + * Before Application Start. (Prepare) */ - default void onStart() { + default void beforeStart() { + // Do nothing by default } /** - * onStop. + * After Application Stopped. (Callback) */ - default void onStop() { + default void afterStop() { + // Do nothing by default } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java index 96f171e..d51b8b5 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntimeManager.java @@ -64,7 +64,9 @@ public class ExecutionRuntimeManager { ExecutionRuntime runtime = ((ExecutionRuntimeProvider) executionRuntimeProviders.get(environment.getClass())).get(); runtime.prepare(environment); executionRuntimeCache.put(environment, runtime); - LOGGER.info("Created new execution runtime {} for environment: {}", runtime, environment); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Created new execution runtime {} for environment: {}", runtime, environment); + } return runtime; } else { LOGGER.error("No matched execution runtime found for environment: " + environment); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java index 1994e28..fb4aff9 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java @@ -137,7 +137,7 @@ public class StormExecutionRuntime implements ExecutionRuntime executor, com.typesafe.config.Config config) { String appId = config.getString("appId"); - LOG.info("Stopping topology {} ..." + appId); + LOG.info("Stopping topology {} ...", appId); if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) { Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient(); try { @@ -151,7 +151,7 @@ public class StormExecutionRuntime implements ExecutionRuntime topologySummaries ; + ApplicationEntity.Status status = null; try { if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) { Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient(); @@ -166,22 +167,28 @@ public class StormExecutionRuntime implements ExecutionRuntime { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java new file mode 100644 index 0000000..cecd81e --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.service; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.scheme.JsonScheme; +import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector; +import org.apache.eagle.alert.metadata.IMetadataDao; +import org.apache.eagle.app.Application; +import org.apache.eagle.app.environment.ExecutionRuntime; +import org.apache.eagle.app.environment.ExecutionRuntimeManager; +import org.apache.eagle.app.sink.KafkaStreamSinkConfig; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.metadata.model.StreamDesc; +import org.apache.eagle.metadata.model.StreamSinkConfig; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * Managed Application Action: org.apache.eagle.app.service.ApplicationAction + *

    + *
  • Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity
  • + *
  • Application Processing Logic (Execution): org.apache.eagle.app.Application
  • + *
  • Application Lifecycle Listener (Installation): org.apache.eagle.app.ApplicationLifecycle
  • + *
+ */ +public class ApplicationAction implements Serializable { + private final Config config; + private final Application application; + private final ExecutionRuntime runtime; + private final ApplicationEntity metadata; + private final IMetadataDao alertMetadataService; + + /** + * @param metadata ApplicationEntity. + * @param application Application. + */ + public ApplicationAction(Application application, ApplicationEntity metadata, Config envConfig, IMetadataDao alertMetadataService) { + Preconditions.checkNotNull(application, "Application is null"); + Preconditions.checkNotNull(metadata, "ApplicationEntity is null"); + this.application = application; + this.metadata = metadata; + this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(), envConfig); + Map executionConfig = metadata.getConfiguration(); + if (executionConfig == null) { + executionConfig = Collections.emptyMap(); + } + + // TODO: Decouple hardcoded configuration key + executionConfig.put("siteId", metadata.getSite().getSiteId()); + executionConfig.put("mode", metadata.getMode().name()); + executionConfig.put("appId", metadata.getAppId()); + executionConfig.put("jarPath", metadata.getJarPath()); + this.config = ConfigFactory.parseMap(executionConfig).withFallback(envConfig); + this.alertMetadataService = alertMetadataService; + } + + /** + * Generate global unique streamId to install. + * TODO refactor with streamId and siteId + */ + private static String generateUniqueStreamId(String siteId,String streamTypeId) { + return String.format("%s_%s",streamTypeId,siteId).toUpperCase(); + } + + public void doInstall() { + if (metadata.getDescriptor().getStreams() != null) { + List streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> { + StreamDefinition copied = streamDefinition.copy(); + copied.setSiteId(metadata.getSite().getSiteId()); + copied.setStreamId(generateUniqueStreamId(metadata.getSite().getSiteId(),copied.getStreamId())); + StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(copied.getStreamId(), this.config); + StreamDesc streamDesc = new StreamDesc(); + streamDesc.setSchema(copied); + streamDesc.setSink(streamSinkConfig); + streamDesc.setStreamId(copied.getStreamId()); + return streamDesc; + })).collect(Collectors.toList()); + metadata.setStreams(streamDescToInstall); + + // TODO: Decouple converting from StreamSink to Alert DataSource + // iterate each stream descriptor and create alert datasource for each + for (StreamDesc streamDesc : streamDescToInstall) { + // only take care of Kafka sink + if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) { + KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink(); + Kafka2TupleMetadata datasource = new Kafka2TupleMetadata(); + datasource.setType("KAFKA"); + datasource.setName(metadata.getAppId()); + datasource.setTopic(kafkaCfg.getTopicId()); + datasource.setSchemeCls(JsonScheme.class.getCanonicalName()); + Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata(); + Properties prop = new Properties(); + prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId()); + tuple2Stream.setStreamNameSelectorProp(prop); + tuple2Stream.setTimestampColumn("timestamp"); + tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName()); + datasource.setCodec(tuple2Stream); + alertMetadataService.addDataSource(datasource); + + StreamDefinition sd = streamDesc.getSchema(); + sd.setDataSource(metadata.getAppId()); + alertMetadataService.createStream(streamDesc.getSchema()); + } + } + } + } + + public void doUninstall() { + // we should remove alert data source and stream definition while we do uninstall + if (metadata.getStreams() == null) { + return; + } + // iterate each stream descriptor and create alert datasource for each + for (StreamDesc streamDesc : metadata.getStreams()) { + alertMetadataService.removeDataSource(metadata.getAppId()); + alertMetadataService.removeStream(streamDesc.getStreamId()); + } + } + + public void doStart() { + this.runtime.start(this.application, this.config); + } + + @SuppressWarnings("unchecked") + public void doStop() { + this.runtime.stop(this.application, this.config); + } + + public ApplicationEntity.Status getStatus() { + return this.runtime.status(this.application, this.config); + } + + public ApplicationEntity getMetadata() { + return metadata; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationListener.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationListener.java new file mode 100644 index 0000000..4a7f0c6 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.service; + +import org.apache.eagle.app.ApplicationLifecycle; +import org.apache.eagle.metadata.model.ApplicationEntity; + +/** + * Application Lifecycle/Management Listener (Guice Aware). + * Currently only listen on application lifecycle , may extend to more later. + */ +public interface ApplicationListener extends ApplicationLifecycle { + /** + * @param applicationEntity ApplicationEntity. + */ + void init(ApplicationEntity applicationEntity); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperationContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperationContext.java deleted file mode 100644 index 3561374..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperationContext.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.service; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.scheme.JsonScheme; -import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.app.Application; -import org.apache.eagle.app.ApplicationLifecycle; -import org.apache.eagle.app.environment.ExecutionRuntime; -import org.apache.eagle.app.environment.ExecutionRuntimeManager; -import org.apache.eagle.app.sink.KafkaStreamSinkConfig; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.apache.eagle.metadata.model.StreamDesc; -import org.apache.eagle.metadata.model.StreamSinkConfig; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - -/** - * Managed Application Interface: org.apache.eagle.app.service.ApplicationOperationContext - *

    - *
  • Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity
  • - *
  • Application Processing Logic (Execution): org.apache.eagle.app.Application
  • - *
  • Application Lifecycle Listener (Installation): org.apache.eagle.app.ApplicationLifecycle
  • - *
- */ -public class ApplicationOperationContext implements Serializable, ApplicationLifecycle { - private final Config config; - private final Application application; - private final ExecutionRuntime runtime; - private final ApplicationEntity metadata; - private final IMetadataDao alertMetadataService; - - /** - * @param metadata ApplicationEntity. - * @param application Application. - */ - public ApplicationOperationContext(Application application, ApplicationEntity metadata, Config envConfig, IMetadataDao alertMetadataService) { - Preconditions.checkNotNull(application, "Application is null"); - Preconditions.checkNotNull(metadata, "ApplicationEntity is null"); - this.application = application; - this.metadata = metadata; - this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(), envConfig); - Map executionConfig = metadata.getConfiguration(); - if (executionConfig == null) { - executionConfig = Collections.emptyMap(); - } - - // TODO: Decouple hardcoded configuration key - executionConfig.put("siteId", metadata.getSite().getSiteId()); - executionConfig.put("mode", metadata.getMode().name()); - executionConfig.put("appId", metadata.getAppId()); - executionConfig.put("jarPath", metadata.getJarPath()); - this.config = ConfigFactory.parseMap(executionConfig).withFallback(envConfig); - this.alertMetadataService = alertMetadataService; - } - - /** - * Generate global unique streamId to install. - * TODO refactor with streamId and siteId - */ - private static String generateUniqueStreamId(String siteId,String streamTypeId) { - return String.format("%s_%s",streamTypeId,siteId).toUpperCase(); - } - - @Override - public void onInstall() { - if (metadata.getDescriptor().getStreams() != null) { - List streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> { - StreamDefinition copied = streamDefinition.copy(); - copied.setSiteId(metadata.getSite().getSiteId()); - copied.setStreamId(generateUniqueStreamId(metadata.getSite().getSiteId(),copied.getStreamId())); - StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(copied.getStreamId(), this.config); - StreamDesc streamDesc = new StreamDesc(); - streamDesc.setSchema(copied); - streamDesc.setSink(streamSinkConfig); - streamDesc.setStreamId(copied.getStreamId()); - return streamDesc; - })).collect(Collectors.toList()); - metadata.setStreams(streamDescToInstall); - - // TODO: Decouple converting from StreamSink to Alert DataSource - // iterate each stream descriptor and create alert datasource for each - for (StreamDesc streamDesc : streamDescToInstall) { - // only take care of Kafka sink - if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) { - KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink(); - Kafka2TupleMetadata datasource = new Kafka2TupleMetadata(); - datasource.setType("KAFKA"); - datasource.setName(metadata.getAppId()); - datasource.setTopic(kafkaCfg.getTopicId()); - datasource.setSchemeCls(JsonScheme.class.getCanonicalName()); - Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata(); - Properties prop = new Properties(); - prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId()); - tuple2Stream.setStreamNameSelectorProp(prop); - tuple2Stream.setTimestampColumn("timestamp"); - tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName()); - datasource.setCodec(tuple2Stream); - alertMetadataService.addDataSource(datasource); - - StreamDefinition sd = streamDesc.getSchema(); - sd.setDataSource(metadata.getAppId()); - alertMetadataService.createStream(streamDesc.getSchema()); - } - } - } - } - - @Override - public void onUninstall() { - // we should remove alert data source and stream definition while we do uninstall - if (metadata.getStreams() == null) { - return; - } - // iterate each stream descriptor and create alert datasource for each - for (StreamDesc streamDesc : metadata.getStreams()) { - alertMetadataService.removeDataSource(metadata.getAppId()); - alertMetadataService.removeStream(streamDesc.getStreamId()); - } - } - - @SuppressWarnings("unchecked") - @Override - public void onStart() { - this.runtime.start(this.application, this.config); - } - - @SuppressWarnings("unchecked") - @Override - public void onStop() { - this.runtime.stop(this.application, this.config); - } - - public ApplicationEntity.Status getStatus() { - return this.runtime.status(this.application, this.config); - } - - public ApplicationEntity getMetadata() { - return metadata; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java index abd9197..6dee1fc 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java @@ -18,14 +18,12 @@ package org.apache.eagle.app.service.impl; import com.google.common.base.Preconditions; import com.google.inject.Inject; +import com.google.inject.Injector; import com.google.inject.Singleton; import com.typesafe.config.Config; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.app.Application; -import org.apache.eagle.app.service.ApplicationManagementService; -import org.apache.eagle.app.service.ApplicationOperations; -import org.apache.eagle.app.service.ApplicationOperationContext; -import org.apache.eagle.app.service.ApplicationProviderService; +import org.apache.eagle.app.service.*; import org.apache.eagle.app.spi.ApplicationProvider; import org.apache.eagle.metadata.exceptions.ApplicationWrongStatusException; import org.apache.eagle.metadata.exceptions.EntityNotFoundException; @@ -41,12 +39,15 @@ import java.util.Map; @Singleton public class ApplicationManagementServiceImpl implements ApplicationManagementService { + private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationManagementServiceImpl.class); + private final SiteEntityService siteEntityService; private final ApplicationProviderService applicationProviderService; private final ApplicationEntityService applicationEntityService; private final IMetadataDao alertMetadataService; private final Config config; - private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationManagementServiceImpl.class); + + @Inject private Injector currentInjector; @Inject public ApplicationManagementServiceImpl( @@ -99,13 +100,26 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe } applicationEntity.setConfiguration(appConfig); + // Validate Dependency validateDependingApplicationInstalled(applicationEntity); - ApplicationOperationContext applicationOperationContext = new ApplicationOperationContext( - applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(), - applicationEntity, config, alertMetadataService); - applicationOperationContext.onInstall(); - return applicationEntityService.create(applicationEntity); + ApplicationProvider applicationProvider = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()); + + // DoInstall + ApplicationAction applicationAction = new ApplicationAction(applicationProvider.getApplication(), applicationEntity, config, alertMetadataService); + applicationAction.doInstall(); + + // UpdateMetadata + ApplicationEntity result = applicationEntityService.create(applicationEntity); + + // AfterInstall Callback + applicationProvider.getApplicationListener().ifPresent((listener) -> { + currentInjector.injectMembers(listener); + listener.init(result); + listener.afterInstall(); + }); + + return result; } private void validateDependingApplicationInstalled(ApplicationEntity applicationEntity) { @@ -120,18 +134,23 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe @Override public ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation) throws ApplicationWrongStatusException { - ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(), operation.getAppId()); - ApplicationOperationContext applicationOperationContext = new ApplicationOperationContext( - applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(), - applicationEntity, config, alertMetadataService); + ApplicationEntity appEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(), operation.getAppId()); + ApplicationProvider appProvider = applicationProviderService.getApplicationProviderByType(appEntity.getDescriptor().getType()); - ApplicationEntity.Status currentStatus = applicationEntity.getStatus(); + ApplicationAction appAction = new ApplicationAction(appProvider.getApplication(), appEntity, config, alertMetadataService); + ApplicationEntity.Status currentStatus = appEntity.getStatus(); try { if (currentStatus == ApplicationEntity.Status.INITIALIZED || currentStatus == ApplicationEntity.Status.STOPPED) { - applicationOperationContext.onUninstall(); - return applicationEntityService.delete(applicationEntity); + // AfterUninstall Callback + appAction.doUninstall(); + appProvider.getApplicationListener().ifPresent((listener) -> { + currentInjector.injectMembers(listener); + listener.init(appEntity); + listener.afterUninstall(); + }); + return applicationEntityService.delete(appEntity); } else { - throw new ApplicationWrongStatusException("App: " + applicationEntity.getAppId() + " status is" + currentStatus + ", uninstall operation is not allowed"); + throw new ApplicationWrongStatusException("App: " + appEntity.getAppId() + " status is" + currentStatus + ", uninstall operation is not allowed"); } } catch (Throwable throwable) { LOGGER.error(throwable.getMessage(), throwable); @@ -141,49 +160,59 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe @Override public ApplicationEntity start(ApplicationOperations.StartOperation operation) throws ApplicationWrongStatusException { - ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(), operation.getAppId()); - Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(); + ApplicationEntity appEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(), operation.getAppId()); + ApplicationProvider appProvider = applicationProviderService.getApplicationProviderByType(appEntity.getDescriptor().getType()); + Application application = appProvider.getApplication(); Preconditions.checkArgument(application.isExecutable(), "Application is not executable"); - ApplicationEntity.Status currentStatus = applicationEntity.getStatus(); + ApplicationEntity.Status currentStatus = appEntity.getStatus(); try { if (currentStatus == ApplicationEntity.Status.INITIALIZED || currentStatus == ApplicationEntity.Status.STOPPED) { - ApplicationOperationContext applicationOperationContext = new ApplicationOperationContext( - application, applicationEntity, config, alertMetadataService); - - applicationOperationContext.onStart(); - //Only when topology submitted successfully can the state change to STARTING - applicationEntityService.delete(applicationEntity); - applicationEntity.setStatus(ApplicationEntity.Status.STARTING); - return applicationEntityService.create(applicationEntity); + ApplicationAction applicationAction = new ApplicationAction(application, appEntity, config, alertMetadataService); + // AfterInstall Callback + appProvider.getApplicationListener().ifPresent((listener) -> { + currentInjector.injectMembers(listener); + listener.init(appEntity); + listener.beforeStart(); + }); + applicationAction.doStart(); + + //TODO: Only when topology submitted successfully can the state change to STARTING + applicationEntityService.delete(appEntity); + appEntity.setStatus(ApplicationEntity.Status.STARTING); + return applicationEntityService.create(appEntity); } else { - throw new ApplicationWrongStatusException("App: " + applicationEntity.getAppId() + " status is " + currentStatus + " start operation is not allowed"); + throw new ApplicationWrongStatusException("App: " + appEntity.getAppId() + " status is " + currentStatus + " start operation is not allowed"); } } catch (ApplicationWrongStatusException e) { LOGGER.error(e.getMessage(), e); throw e; } catch (Exception e) { - LOGGER.error("Failed to start app " + applicationEntity.getAppId(), e); + LOGGER.error("Failed to start app " + appEntity.getAppId(), e); throw e; } catch (Throwable throwable) { LOGGER.error(throwable.getMessage(), throwable); throw throwable; } - } @Override public ApplicationEntity stop(ApplicationOperations.StopOperation operation) throws ApplicationWrongStatusException { ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(), operation.getAppId()); - Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(); + ApplicationProvider appProvider = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()); + Application application = appProvider.getApplication(); Preconditions.checkArgument(application.isExecutable(), "Application is not executable"); - ApplicationOperationContext applicationOperationContext = new ApplicationOperationContext( - application, applicationEntity, config, alertMetadataService); + ApplicationAction applicationAction = new ApplicationAction(application, applicationEntity, config, alertMetadataService); ApplicationEntity.Status currentStatus = applicationEntity.getStatus(); try { if (currentStatus == ApplicationEntity.Status.RUNNING) { - applicationOperationContext.onStop(); + applicationAction.doStop(); + appProvider.getApplicationListener().ifPresent((listener) -> { + currentInjector.injectMembers(listener); + listener.init(applicationEntity); + listener.afterStop(); + }); //stop -> directly killed applicationEntityService.delete(applicationEntity); applicationEntity.setStatus(ApplicationEntity.Status.STOPPING); @@ -210,14 +239,11 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(); Preconditions.checkArgument(application.isExecutable(), "Application is not executable"); - ApplicationOperationContext applicationOperationContext = new ApplicationOperationContext( - application, applicationEntity, config, alertMetadataService); - ApplicationEntity.Status topologyStatus = applicationOperationContext.getStatus(); - return topologyStatus; + ApplicationAction applicationAction = new ApplicationAction(application, applicationEntity, config, alertMetadataService); + return applicationAction.getStatus(); } catch (IllegalArgumentException e) { LOGGER.error("application id not exist", e); throw e; } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java index 025bc7c..1455922 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java @@ -99,7 +99,6 @@ public class ApplicationProviderConfigLoader extends ApplicationProviderLoader { throw new RuntimeException("providerClassName is not implementation of " + ApplicationProvider.class.getCanonicalName()); } ApplicationProvider provider = (ApplicationProvider) providerClass.newInstance(); - provider.prepare(providerConfig, this.getConfig()); Preconditions.checkNotNull(provider.getApplicationDesc(), "appDesc is null"); Preconditions.checkNotNull(provider.getApplicationDesc().getType(), "type is null"); registerProvider(provider); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java index 9f52c9c..cbbd438 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java @@ -81,7 +81,6 @@ public class ApplicationProviderSPILoader extends ApplicationProviderLoader { providerConfig.setClassName(applicationProvider.getClass().getCanonicalName()); providerConfig.setJarPath(jarFileSupplier.apply(applicationProvider)); applicationProvider.getApplicationDesc().setExecutable(applicationProvider.getApplication().isExecutable()); - applicationProvider.prepare(providerConfig, getConfig()); registerProvider(applicationProvider); LOG.warn("Loaded {}:{} ({}) from {}", applicationProvider.getApplicationDesc().getType(), http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java index 2a03275..2ac4779 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java @@ -65,7 +65,7 @@ public class KafkaStreamSink extends StormStreamSink { } @Override - public void onInstall() { + public void afterInstall() { ensureTopicCreated(); } @@ -85,7 +85,7 @@ public class KafkaStreamSink extends StormStreamSink { } @Override - public void onUninstall() { + public void afterUninstall() { ensureTopicDeleted(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java index 61a6836..8256aba 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java @@ -27,13 +27,13 @@ public class LoggingStreamSink extends StormStreamSink private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class); @Override - public void onInstall() { - LOGGER.info("Executing onInstall callback, do nothing"); + public void afterInstall() { + LOGGER.info("Executing afterInstall callback, do nothing"); } @Override - public void onUninstall() { - LOGGER.info("Executing onUninstall callback, do nothing"); + public void afterUninstall() { + LOGGER.info("Executing afterUninstall callback, do nothing"); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java index 06ac703..2a8d7c0 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java @@ -17,20 +17,31 @@ package org.apache.eagle.app.spi; +import com.google.common.base.Preconditions; +import com.google.inject.AbstractModule; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.app.Application; +import org.apache.eagle.app.service.ApplicationListener; +import org.apache.eagle.common.module.GlobalScope; import org.apache.eagle.common.module.ModuleRegistry; +import org.apache.eagle.common.module.ModuleScope; import org.apache.eagle.metadata.model.ApplicationDesc; import org.apache.eagle.metadata.model.ApplicationDocs; +import org.apache.eagle.metadata.model.ApplicationEntity; import org.apache.eagle.metadata.model.Configuration; +import org.apache.eagle.metadata.persistence.MetadataStore; +import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.xml.bind.JAXBException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Optional; /** - * Describe Application metadata with XML descriptor configuration in path of: /META-INF/providers/${ApplicationProviderClassName}.xml + * Describe Application metadata with XML descriptor configuration in path of: /META-INF/providers/${ApplicationProviderClassName}.xml. */ public abstract class AbstractApplicationProvider implements ApplicationProvider { private static final Logger LOG = LoggerFactory.getLogger(AbstractApplicationProvider.class); @@ -96,8 +107,43 @@ public abstract class AbstractApplicationProvider impleme return applicationDesc; } + private ModuleRegistry currentRegistry; + @Override - public void register(ModuleRegistry registry) { + public final void register(ModuleRegistry registry) { LOG.debug("Registering modules {}", this.getClass().getName()); + this.currentRegistry = registry; + onRegister(); + } + + @Override + public Optional getApplicationListener() { + return Optional.empty(); + } + + protected void onRegister() { + // Do nothing by default; + } + + protected void bind(Class scope, Class type, Class impl) { + Preconditions.checkNotNull(currentRegistry, "No registry set before being used"); + currentRegistry.register(scope, new AbstractModule() { + @Override + protected void configure() { + bind(type).to(impl); + } + }); + } + + public void bind(Class type, Class impl) { + bind(GlobalScope.class,type,impl); + } + + protected void bindToMetaStore(Class scope, Class type, Class impl) { + bind(scope,type,impl); + } + + public void bindToMemoryMetaStore(Class type, Class impl) { + bindToMetaStore(MemoryMetadataStore.class,type,impl); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java index 0dceb72..bc70373 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,15 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eagle.app.spi; -import com.typesafe.config.Config; import org.apache.eagle.app.Application; -import org.apache.eagle.app.config.ApplicationProviderConfig; +import org.apache.eagle.app.service.ApplicationListener; import org.apache.eagle.common.module.ModuleRegistry; import org.apache.eagle.metadata.model.ApplicationDesc; import java.lang.reflect.ParameterizedType; +import java.util.Optional; /** * Application Service Provider Interface. @@ -32,13 +33,6 @@ import java.lang.reflect.ParameterizedType; public interface ApplicationProvider { /** - * Prepare Application Provider before loading. - */ - default void prepare(ApplicationProviderConfig providerConfig, Config envConfig) { - // Do nothing by default. - } - - /** * @return application descriptor. */ ApplicationDesc getApplicationDesc(); @@ -67,6 +61,11 @@ public interface ApplicationProvider { T getApplication(); /** + * @return application lifecycle listeners type. + */ + Optional getApplicationListener(); + + /** * Extend application modules like Web Resource, Metadata Store, etc. */ void register(ModuleRegistry registry); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java index 2094d74..1abdb6b 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java @@ -83,7 +83,6 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator { public void start(Class appProviderClass, Map appConfig) { try { ApplicationProvider applicationProvider = appProviderClass.newInstance(); - applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass), appProviderClass), config); start(applicationProvider.getApplicationDesc().getType(), appConfig); } catch (InstantiationException | IllegalAccessException e) { throw new IllegalStateException(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java index c6ac5db..a47e30a 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java @@ -72,13 +72,8 @@ public class TestStormApplication extends StormApplication{ } @Override - public void register(ModuleRegistry registry) { - registry.register(MemoryMetadataStore.class, new AbstractModule() { - @Override - protected void configure() { - bind(ExtendedDao.class).to(ExtendedDaoImpl.class); - } - }); + public void onRegister() { + bindToMemoryMetaStore(ExtendedDao.class,ExtendedDaoImpl.class); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationActionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationActionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationActionTest.java new file mode 100644 index 0000000..d076bf7 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationActionTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.service; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +public class ApplicationActionTest { + /** + * appConfig.withFallback(envConfig): appConfig will override envConfig, envConfig is used as default config + */ + @Test + public void testTypeSafeConfigMerge(){ + Config appConfig = ConfigFactory.parseMap(new HashMap(){{ + put("APP_CONFIG",ApplicationActionTest.this.getClass().getCanonicalName()); + put("SCOPE","APP"); + }}); + + Config envConfig = ConfigFactory.parseMap(new HashMap(){{ + put("ENV_CONFIG",ApplicationActionTest.this.getClass().getCanonicalName()); + put("SCOPE","ENV"); + }}); + + Config mergedConfig = appConfig.withFallback(envConfig); + Assert.assertTrue(mergedConfig.hasPath("APP_CONFIG")); + Assert.assertTrue(mergedConfig.hasPath("ENV_CONFIG")); + Assert.assertEquals("appConfig.withFallback(envConfig): appConfig will override envConfig, envConfig is used as default config", + "APP",mergedConfig.getString("SCOPE")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationOperationContextTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationOperationContextTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationOperationContextTest.java deleted file mode 100644 index 21044be..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/service/ApplicationOperationContextTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.service; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; - -public class ApplicationOperationContextTest { - /** - * appConfig.withFallback(envConfig): appConfig will override envConfig, envConfig is used as default config - */ - @Test - public void testTypeSafeConfigMerge(){ - Config appConfig = ConfigFactory.parseMap(new HashMap(){{ - put("APP_CONFIG",ApplicationOperationContextTest.this.getClass().getCanonicalName()); - put("SCOPE","APP"); - }}); - - Config envConfig = ConfigFactory.parseMap(new HashMap(){{ - put("ENV_CONFIG",ApplicationOperationContextTest.this.getClass().getCanonicalName()); - put("SCOPE","ENV"); - }}); - - Config mergedConfig = appConfig.withFallback(envConfig); - Assert.assertTrue(mergedConfig.hasPath("APP_CONFIG")); - Assert.assertTrue(mergedConfig.hasPath("ENV_CONFIG")); - Assert.assertEquals("appConfig.withFallback(envConfig): appConfig will override envConfig, envConfig is used as default config", - "APP",mergedConfig.getString("SCOPE")); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java index 5297de1..c6d01f4 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java +++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java @@ -39,6 +39,12 @@ public class ApplicationEntity extends PersistenceEntity { private List streams; private Mode mode = Mode.CLUSTER; private String jarPath; + + @Override + public String toString() { + return String.format("Application[appId=%s,siteId=%s,UUID=%s]", appId, descriptor.getType(), this.getUuid()); + } + private Status status = Status.INITIALIZED; public ApplicationEntity() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java index c207788..bedda04 100644 --- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java +++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java @@ -16,23 +16,25 @@ */ package org.apache.eagle.app.example; -import com.google.inject.AbstractModule; import org.apache.eagle.app.example.extensions.ExampleCommonService; import org.apache.eagle.app.example.extensions.ExampleCommonServiceImpl; import org.apache.eagle.app.example.extensions.ExampleEntityService; import org.apache.eagle.app.example.extensions.ExampleEntityServiceMemoryImpl; +import org.apache.eagle.app.service.ApplicationListener; import org.apache.eagle.app.spi.AbstractApplicationProvider; -import org.apache.eagle.common.module.GlobalScope; -import org.apache.eagle.common.module.ModuleRegistry; -import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; +import org.apache.eagle.metadata.model.ApplicationEntity; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.inject.Inject; + +import java.util.Optional; /** * Define application provider pragmatically */ public class ExampleApplicationProvider extends AbstractApplicationProvider { - private final static Logger LOGGER = LoggerFactory.getLogger(ExampleApplicationProvider.class); + private static final Logger LOG = LoggerFactory.getLogger(ExampleApplicationProvider.class); @Override public ExampleStormApplication getApplication() { @@ -40,21 +42,44 @@ public class ExampleApplicationProvider extends AbstractApplicationProvider getApplicationListener() { + return Optional.of(new ApplicationListener() { + + @Inject ExampleEntityService entityService; + + private ApplicationEntity application; + @Override - protected void configure() { - LOGGER.info("Load memory metadata modules ..."); - bind(ExampleEntityService.class).to(ExampleEntityServiceMemoryImpl.class); + public void init(ApplicationEntity applicationEntity) { + this.application = applicationEntity; + entityService.getEntities(); + } + + @Override + public void afterInstall() { + LOG.info("afterInstall {}", this.application); } - }); - registry.register(new AbstractModule() { @Override - protected void configure() { - LOGGER.info("Load global modules ..."); - bind(ExampleCommonService.class).to(ExampleCommonServiceImpl.class); + public void afterUninstall() { + LOG.info("afterUninstall {}", this.application); + } + + @Override + public void beforeStart() { + LOG.info("beforeStart {}", this.application); + } + + @Override + public void afterStop() { + LOG.info("afterStop {}", this.application); } }); } + + @Override + protected void onRegister() { + bindToMemoryMetaStore(ExampleEntityService.class,ExampleEntityServiceMemoryImpl.class); + bind(ExampleCommonService.class,ExampleCommonServiceImpl.class); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fc2407cd/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java index d9ae0ab..e788eb9 100644 --- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java +++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java @@ -32,11 +32,6 @@ public class HBaseAuditLogAppProvider extends AbstractApplicationProvider dataSourceConfig.zkConnection dataSourceConfig.zkConnection - server.eagle.apache.org + localhost zk connection dataSourceConfig.txZkServers dataSourceConfig.txZkServers - server.eagle.apache.org:2181 + localhost:2181 zookeeper server for offset transaction @@ -133,7 +133,7 @@ dataSinkConfig.brokerList dataSinkConfig.brokerList - server.eagle.apache.org:6667 + localhost:6667 kafka broker list @@ -153,7 +153,7 @@ fs.defaultFS fs.defaultFS - hdfs://server.eagle.apache.org:8020 + hdfs://localhost:8020 hdfs endpoint