Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DE7F3200BAA for ; Thu, 27 Oct 2016 12:31:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DD0F6160AF6; Thu, 27 Oct 2016 10:31:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 39BD2160AE4 for ; Thu, 27 Oct 2016 12:31:43 +0200 (CEST) Received: (qmail 37958 invoked by uid 500); 27 Oct 2016 10:31:42 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 37949 invoked by uid 99); 27 Oct 2016 10:31:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Oct 2016 10:31:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D4416C7B8F for ; Thu, 27 Oct 2016 10:31:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 6M0sVIffh3zy for ; Thu, 27 Oct 2016 10:31:38 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id A24D55FD01 for ; Thu, 27 Oct 2016 10:31:37 +0000 (UTC) Received: (qmail 37922 invoked by uid 99); 27 Oct 2016 10:31:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Oct 2016 10:31:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C9635DFC55; Thu, 27 Oct 2016 10:31:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.incubator.apache.org Message-Id: <6cb4c0761c6e4e97ab0bc76f0498cede@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-681] Add new publisher AlertEagleStorePlugin Date: Thu, 27 Oct 2016 10:31:36 +0000 (UTC) archived-at: Thu, 27 Oct 2016 10:31:45 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master 1add6d96f -> f07777706 [EAGLE-681] Add new publisher AlertEagleStorePlugin https://issues.apache.org/jira/browse/EAGLE-681 Author: Zhao, Qingwen Closes #573 from qingwen220/EAGLE-681. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f0777770 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f0777770 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f0777770 Branch: refs/heads/master Commit: f077777066c6bdc53bbe3004555ff3f24953ed25 Parents: 1add6d9 Author: Zhao, Qingwen Authored: Thu Oct 27 18:31:04 2016 +0800 Committer: Zhao, Qingwen Committed: Thu Oct 27 18:31:04 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/model/AlertPublishEvent.java | 107 +++++++++++++++++++ .../alert/engine/model/AlertStreamEvent.java | 32 ++++++ .../alert/service/IMetadataServiceClient.java | 10 ++ .../service/MetadataServiceClientImpl.java | 20 ++++ .../mock/InMemMetadataServiceClient.java | 17 +++ .../AbstractMetadataChangeNotifyService.java | 5 + .../impl/ZKMetadataChangeNotifyService.java | 6 ++ .../publisher/AlertPublishSpecListener.java | 3 + .../publisher/impl/AbstractPublishPlugin.java | 2 + .../publisher/impl/AlertEagleStorePlugin.java | 81 ++++++++++++++ .../alert/engine/runner/AlertPublisherBolt.java | 37 ++++++- .../integration/MockMetadataServiceClient.java | 16 +++ .../metadata/resource/MetadataResource.java | 25 +++++ .../eagle/alert/metadata/IMetadataDao.java | 5 + .../metadata/impl/InMemMetadataDaoImpl.java | 18 ++++ .../metadata/impl/JdbcMetadataDaoImpl.java | 11 ++ .../alert/metadata/impl/JdbcSchemaManager.java | 2 + .../metadata/impl/MongoMetadataDaoImpl.java | 12 +++ 18 files changed, 404 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java new file mode 100644 index 0000000..9d73c2d --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.model; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class AlertPublishEvent { + private String alertId; + private String siteId; + private List appIds; + private String policyId; + private String policyValue; + private long alertTimestamp; + private Map alertData; + + public static final String SITE_ID_KEY = "siteId"; + public static final String APP_IDS_KEY = "appIds"; + public static final String POLICY_VALUE_KEY = "policyValue"; + + public String getAlertId() { + return alertId; + } + + public void setAlertId(String alertId) { + this.alertId = alertId; + } + + public List getAppIds() { + return appIds; + } + + public void setAppIds(List appIds) { + this.appIds = appIds; + } + + public String getPolicyValue() { + return policyValue; + } + + public void setPolicyValue(String policyValue) { + this.policyValue = policyValue; + } + + public long getAlertTimestamp() { + return alertTimestamp; + } + + public void setAlertTimestamp(long alertTimestamp) { + this.alertTimestamp = alertTimestamp; + } + + public String getSiteId() { + return siteId; + } + + public void setSiteId(String siteId) { + this.siteId = siteId; + } + + + public String getPolicyId() { + return policyId; + } + + public void setPolicyId(String policyId) { + this.policyId = policyId; + } + + public Map getAlertData() { + return alertData; + } + + public void setAlertData(Map alertData) { + this.alertData = alertData; + } + + public static AlertPublishEvent createAlertPublishEvent(AlertStreamEvent event) { + AlertPublishEvent alertEvent = new AlertPublishEvent(); + alertEvent.setAlertId(UUID.randomUUID().toString()); + alertEvent.setPolicyId(event.getPolicyId()); + if (event.getExtraData() != null) { + alertEvent.setSiteId(event.getExtraData().get(SITE_ID_KEY).toString()); + alertEvent.setPolicyValue(event.getExtraData().get(POLICY_VALUE_KEY).toString()); + alertEvent.setAppIds((List) event.getExtraData().get(APP_IDS_KEY)); + } + alertEvent.setAlertData(event.getDataMap()); + return alertEvent; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java index ef3f71c..e502244 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java @@ -16,11 +16,14 @@ */ package org.apache.eagle.alert.engine.model; +import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.utils.DateTimeUtil; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * streamId stands for alert type instead of source event streamId. @@ -32,6 +35,8 @@ public class AlertStreamEvent extends StreamEvent { private StreamDefinition schema; private String createdBy; private long createdTime; + // app related fields + private Map extraData; public AlertStreamEvent() { } @@ -94,4 +99,31 @@ public class AlertStreamEvent extends StreamEvent { public void setCreatedTime(long createdTime) { this.createdTime = createdTime; } + + public Map getDataMap() { + Map event = new HashMap<>(); + for (StreamColumn column : schema.getColumns()) { + Object obj = this.getData()[schema.getColumnIndex(column.getName())]; + if (obj == null) { + event.put(column.getName(), null); + continue; + } + if (column.getName().equalsIgnoreCase("timestamp") && obj instanceof Long) { + String eventTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(((Long) obj).longValue()); + event.put(column.getName(), eventTime); + } else { + event.put(column.getName(), obj.toString()); + } + } + return event; + } + + public Map getExtraData() { + return extraData; + } + + public void setExtraData(Map extraData) { + this.extraData = extraData; + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java index 2d1072d..b00fc78 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java @@ -27,6 +27,8 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamingCluster; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; + import java.io.Closeable; import java.io.Serializable; import java.util.List; @@ -85,4 +87,12 @@ public interface IMetadataServiceClient extends Closeable, Serializable { void clear(); // for topology mgmt + + // for alert event + List listAlertPublishEvent(); + + void addAlertPublishEvent(AlertPublishEvent event); + + void addAlertPublishEvents(List events); + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java index 8178824..099767e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java @@ -37,6 +37,7 @@ import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; import com.typesafe.config.Config; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient { private static final String METADATA_POLICIES_PATH = "/metadata/policies"; private static final String METADATA_CLUSTERS_PATH = "/metadata/clusters"; private static final String METADATA_TOPOLOGY_PATH = "/metadata/topologies"; + private static final String METADATA_ALERTS_PATH = "/metadata/alerts"; private static final String METADATA_PUBLISHMENTS_BATCH_PATH = "/metadata/publishments/batch"; private static final String METADATA_DATASOURCES_BATCH_PATH = "/metadata/datasources/batch"; @@ -64,6 +66,7 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient { private static final String METADATA_POLICIES_BATCH_PATH = "/metadata/policies/batch"; private static final String METADATA_CLUSTERS_BATCH_PATH = "/metadata/clusters/batch"; private static final String METADATA_TOPOLOGY_BATCH_PATH = "/metadata/topologies/batch"; + private static final String METADATA_ALERTS_BATCH_PATH = "/metadata/alerts/batch"; private static final String METADATA_CLEAR_PATH = "/metadata/clear"; @@ -275,4 +278,21 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient { r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(); } + @Override + public List listAlertPublishEvent() { + return list(METADATA_ALERTS_PATH, new GenericType>(){}); + } + + @Override + public void addAlertPublishEvent(AlertPublishEvent event) { + WebResource r = client.resource(basePath + METADATA_ALERTS_PATH); + r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(event); + } + + @Override + public void addAlertPublishEvents(List events) { + WebResource r = client.resource(basePath + METADATA_ALERTS_BATCH_PATH); + r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(events); + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java index 5024429..ee7ca54 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java @@ -33,6 +33,7 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamingCluster; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.service.IMetadataServiceClient; /** @@ -53,6 +54,7 @@ public class InMemMetadataServiceClient implements IMetadataServiceClient { private SortedMap scheduleStates = new TreeMap(); private List spoutSpecs = new ArrayList(); private List publishmetns = new ArrayList(); + private List alerts = new ArrayList<>(); @Override public void close() throws IOException { @@ -181,4 +183,19 @@ public class InMemMetadataServiceClient implements IMetadataServiceClient { // do nothing } + @Override + public List listAlertPublishEvent() { + return this.alerts; + } + + @Override + public void addAlertPublishEvent(AlertPublishEvent event) { + this.alerts.add(event); + } + + @Override + public void addAlertPublishEvents(List events) { + this.alerts.addAll(events); + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java index 5da4cbd..bbb6ca1 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java @@ -22,6 +22,7 @@ import org.apache.eagle.alert.coordination.model.RouterSpec; import org.apache.eagle.alert.coordination.model.SpoutSpec; import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; import org.apache.eagle.alert.engine.coordinator.MetadataType; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener; import org.apache.eagle.alert.engine.router.AlertBoltSpecListener; @@ -102,6 +103,10 @@ public abstract class AbstractMetadataChangeNotifyService implements IMetadataCh alertPublishSpecListeners.forEach(s -> s.onAlertPublishSpecChange(alertPublishSpec, sds)); } + protected void notifyAlertPublishBolt(Map pds) { + alertPublishSpecListeners.forEach(s -> s.onAlertPolicyChange(pds)); + } + public void close() throws IOException { LOG.info("Closed"); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java index 9b31727..7bf4984 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java @@ -122,6 +122,9 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS } else { prePopulate(alertSpec, state.getPolicySnapshots()); notifyAlertBolt(alertSpec, sds); + if (state.getPublishSpecs().get(topologyId) != null) { + notifyAlertPublishBolt(listToMap(state.getPolicySnapshots())); + } } break; case ALERT_PUBLISH_BOLT: @@ -130,6 +133,9 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId); } else { notifyAlertPublishBolt(pubSpec, sds); + if (state.getAlertSpecs().get(topologyId) != null) { + notifyAlertPublishBolt(listToMap(state.getPolicySnapshots())); + } } break; case SPOUT: http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java index b03305f..d8429ca 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java @@ -17,10 +17,13 @@ package org.apache.eagle.alert.engine.publisher; import org.apache.eagle.alert.coordination.model.PublishSpec; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import java.util.Map; public interface AlertPublishSpecListener { void onAlertPublishSpecChange(PublishSpec spec, Map sds); + + void onAlertPolicyChange(Map pds); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java index b155bb8..f0f048d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -18,10 +18,12 @@ package org.apache.eagle.alert.engine.publisher.impl; import com.google.common.base.Joiner; import com.typesafe.config.Config; +import javafx.scene.control.Alert; import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.codec.IEventSerializer; import org.apache.eagle.alert.engine.coordinator.OverrideDeduplicatorSpec; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java new file mode 100644 index 0000000..b9b5f2e --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.publisher.impl; + +import com.typesafe.config.Config; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.PublishConstants; +import org.apache.eagle.alert.service.IMetadataServiceClient; +import org.apache.eagle.alert.service.MetadataServiceClientImpl; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class AlertEagleStorePlugin extends AbstractPublishPlugin { + + private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class); + private transient IMetadataServiceClient client; + + @Override + public void init(Config config, Publishment publishment, Map conf) throws Exception { + super.init(config, publishment, conf); + client = new MetadataServiceClientImpl(config); + } + + @Override + public void close() { + try { + client.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @SuppressWarnings("rawtypes") + @Override + public void update(String dedupIntervalMin, Map pluginProperties) { + deduplicator.setDedupIntervalMin(dedupIntervalMin); + } + + @Override + public void onAlert(AlertStreamEvent event) throws Exception { + List eventList = this.dedup(event); + if (eventList == null || eventList.isEmpty()) { + return; + } + List alertEvents = new ArrayList<>(); + for (AlertStreamEvent e : eventList) { + alertEvents.add(AlertPublishEvent.createAlertPublishEvent(e)); + } + client.addAlertPublishEvents(alertEvents); + } + + @Override + protected Logger getLogger() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index 9755c90..f0ad0ae 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -16,12 +16,11 @@ */ package org.apache.eagle.alert.engine.runner; +import org.apache.commons.collections.map.HashedMap; import org.apache.eagle.alert.coordination.model.PublishSpec; import org.apache.eagle.alert.engine.StreamContextImpl; -import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.coordinator.MetadataType; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener; import org.apache.eagle.alert.engine.publisher.AlertPublisher; @@ -48,6 +47,8 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class); private final AlertPublisher alertPublisher; private volatile Map cachedPublishments = new HashMap<>(); + private volatile Map policyDefinitionMap; + private volatile Map streamDefinitionMap; public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService) { super(alertPublisherName, coordinatorService, config); @@ -66,7 +67,9 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli public void execute(Tuple input) { try { streamContext.counter().scope("receive_count"); - alertPublisher.nextEvent((AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1)); + AlertStreamEvent event = (AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1); + wrapAlertPublishEvent(event); + alertPublisher.nextEvent(event); this.collector.ack(input); streamContext.counter().scope("ack_count"); } catch (Exception ex) { @@ -92,6 +95,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli if (pubSpec == null) { return; } + this.streamDefinitionMap = sds; List newPublishments = pubSpec.getPublishments(); if (newPublishments == null) { @@ -112,4 +116,27 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli cachedPublishments = newPublishmentsMap; specVersion = pubSpec.getVersion(); } + + @Override + public void onAlertPolicyChange(Map pds) { + this.policyDefinitionMap = pds; + } + + private void wrapAlertPublishEvent(AlertStreamEvent event) { + Map extraData = new HashedMap(); + List appIds = new ArrayList<>(); + if (this.policyDefinitionMap != null) { + PolicyDefinition policyDefinition = policyDefinitionMap.get(event.getPolicyId()); + for ( String inputStreamId : policyDefinition.getInputStreams()) { + StreamDefinition sd = this.streamDefinitionMap.get(inputStreamId); + if (sd != null) { + extraData.put(AlertPublishEvent.SITE_ID_KEY, sd.getSiteId()); + appIds.add(sd.getDataSource()); + } + } + extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds); + extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, policyDefinition.getDefinition().getValue()); + } + event.setExtraData(extraData); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java index 4efcc28..7f650c6 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java @@ -24,6 +24,7 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamingCluster; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.service.IMetadataServiceClient; import java.io.IOException; @@ -151,4 +152,19 @@ public class MockMetadataServiceClient implements IMetadataServiceClient { public void clear() { } + + @Override + public List listAlertPublishEvent() { + return null; + } + + @Override + public void addAlertPublishEvent(AlertPublishEvent event) { + + } + + @Override + public void addAlertPublishEvents(List events) { + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java index f0a89ce..f5b34b8 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java @@ -17,6 +17,7 @@ package org.apache.eagle.service.metadata.resource; import com.google.common.base.Preconditions; +import javafx.scene.control.Alert; import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.ScheduleState; @@ -26,11 +27,13 @@ import org.apache.eagle.alert.engine.coordinator.*; import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter; import org.apache.eagle.alert.engine.interpreter.PolicyParseResult; import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; import com.google.inject.Inject; +import org.apache.storm.zookeeper.Op; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -476,6 +479,28 @@ public class MetadataResource { return results; } + @Path("/alerts") + @POST + public OpResult addAlertPublishEvent(AlertPublishEvent event) { + return dao.addAlertPublishEvent(event); + } + + @Path("/alerts/batch") + @POST + public List addAlertPublishEvents(List events) { + List results = new LinkedList<>(); + for (AlertPublishEvent e : events) { + results.add(dao.addAlertPublishEvent(e)); + } + return results; + } + + @Path("/alerts") + @GET + public List listAlertPublishEvents(@QueryParam("size") int size) { + return dao.listAlertPublishEvent(size); + } + @Path("/topologies/{topologyName}") @DELETE public OpResult removeTopology(@PathParam("topologyName") String topologyName) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java index 06e96c7..d25c548 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java @@ -22,6 +22,7 @@ import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; import org.slf4j.Logger; @@ -77,6 +78,10 @@ public interface IMetadataDao extends Closeable { OpResult removePublishmentType(String pubType); + List listAlertPublishEvent(int size); + + OpResult addAlertPublishEvent(AlertPublishEvent event); + ScheduleState getScheduleState(String versionId); ScheduleState getScheduleState(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java index dea5b82..0de13f1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java @@ -21,6 +21,7 @@ import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.MetadataUtils; import org.apache.eagle.alert.metadata.resource.Models; @@ -54,6 +55,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao { private SortedMap scheduleStates = new TreeMap(); private List assignments = new ArrayList(); private List topologies = new ArrayList(); + private List alerts = new ArrayList<>(); @Inject public InMemMetadataDaoImpl(Config config) { @@ -196,6 +198,22 @@ public class InMemMetadataDaoImpl implements IMetadataDao { return remove(publishmentTypes, pubType); } + @Override + public List listAlertPublishEvent(int size) { + if (size > 0 && size <= alerts.size()) { + return alerts.subList(0, size); + } + return alerts; + } + + @Override + public OpResult addAlertPublishEvent(AlertPublishEvent event) { + alerts.add(event); + OpResult result = new OpResult(); + result.code = 200; + return result; + } + @Override public synchronized OpResult addScheduleState(ScheduleState state) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java index bba2d1e..0b595ac 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java @@ -21,6 +21,7 @@ import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; @@ -72,6 +73,11 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { } @Override + public List listAlertPublishEvent(int size) { + return null; + } + + @Override public ScheduleState getScheduleState(String versionId) { return handler.listWithFilter(versionId, ScheduleState.class); //return null; @@ -103,6 +109,11 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { } @Override + public OpResult addAlertPublishEvent(AlertPublishEvent event) { + return null; + } + + @Override public OpResult createStream(StreamDefinition stream) { return handler.addOrReplace(StreamDefinition.class.getSimpleName(), stream); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java index 274164d..33d9a06 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java @@ -23,6 +23,7 @@ import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.MetadataUtils; import com.typesafe.config.Config; import org.apache.ddlutils.Platform; @@ -67,6 +68,7 @@ public class JdbcSchemaManager { registerTableName(ScheduleState.class.getSimpleName(), "schedule_state"); registerTableName(PolicyAssignment.class.getSimpleName(), "assignment"); registerTableName(Topology.class.getSimpleName(), "topology"); + registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event"); } public static JdbcSchemaManager getInstance() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0777770/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java index 4d70800..ab42e7d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java @@ -36,6 +36,7 @@ import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; import org.apache.eagle.alert.coordination.model.internal.ScheduleStateBase; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.MetadataUtils; import org.apache.eagle.alert.metadata.resource.Models; @@ -74,6 +75,7 @@ public class MongoMetadataDaoImpl implements IMetadataDao { private MongoCollection publishment; private MongoCollection publishmentType; private MongoCollection topologies; + private MongoCollection alerts; // scheduleStates splits to several collections private MongoCollection scheduleStates; @@ -303,6 +305,16 @@ public class MongoMetadataDaoImpl implements IMetadataDao { return remove(publishmentType, pubType); } + @Override + public List listAlertPublishEvent(int size) { + return null; + } + + @Override + public OpResult addAlertPublishEvent(AlertPublishEvent event) { + return null; + } + private OpResult addOne(MongoCollection collection, T t) { OpResult result = new OpResult(); String json = "";