Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CAEB9200BBE for ; Fri, 28 Oct 2016 04:11:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C9680160B01; Fri, 28 Oct 2016 02:11:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 50001160AF6 for ; Fri, 28 Oct 2016 04:11:08 +0200 (CEST) Received: (qmail 95809 invoked by uid 500); 28 Oct 2016 02:11:07 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 95800 invoked by uid 99); 28 Oct 2016 02:11:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 02:11:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 19E911A01F3 for ; Fri, 28 Oct 2016 02:11:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 6CHDK5QDTuMK for ; Fri, 28 Oct 2016 02:11:02 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 8D4425FBD4 for ; Fri, 28 Oct 2016 02:11:00 +0000 (UTC) Received: (qmail 95701 invoked by uid 99); 28 Oct 2016 02:10:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 02:10:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6A2FEF0BF2; Fri, 28 Oct 2016 02:10:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jilin@apache.org To: commits@eagle.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [MINOR] Enhance new publisher AlertEagleStorePlugin Date: Fri, 28 Oct 2016 02:10:59 +0000 (UTC) archived-at: Fri, 28 Oct 2016 02:11:10 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master f07777706 -> 2e5cb719c [MINOR] Enhance new publisher AlertEagleStorePlugin https://issues.apache.org/jira/browse/EAGLE-681 * implement alert apis on jdbc & mongodb * add a rest api for searching a alert by alertId Author: Zhao, Qingwen Closes #575 from qingwen220/EAGLE-681. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2e5cb719 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2e5cb719 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2e5cb719 Branch: refs/heads/master Commit: 2e5cb719c3a49da920d8d6807e10bcfdb6e55934 Parents: f077777 Author: Zhao, Qingwen Authored: Fri Oct 28 10:10:50 2016 +0800 Committer: zombieJ Committed: Fri Oct 28 10:10:50 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/model/AlertPublishEvent.java | 5 +- .../alert/engine/model/AlertStreamEvent.java | 4 +- .../AbstractMetadataChangeNotifyService.java | 4 +- .../impl/ZKMetadataChangeNotifyService.java | 4 +- .../publisher/AlertPublishSpecListener.java | 2 +- .../publisher/impl/AlertEagleStorePlugin.java | 2 +- .../alert/engine/runner/AlertPublisherBolt.java | 3 +- .../metadata/resource/MetadataResource.java | 8 +- .../eagle/alert/metadata/IMetadataDao.java | 2 + .../eagle/alert/metadata/MetadataUtils.java | 4 + .../metadata/impl/InMemMetadataDaoImpl.java | 10 +++ .../metadata/impl/JdbcDatabaseHandler.java | 94 ++++++-------------- .../metadata/impl/JdbcMetadataDaoImpl.java | 11 ++- .../metadata/impl/MongoMetadataDaoImpl.java | 23 ++++- .../alert/resource/impl/MongoImplTest.java | 11 +++ 15 files changed, 106 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java index 9d73c2d..3453e55 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java @@ -96,7 +96,8 @@ public class AlertPublishEvent { AlertPublishEvent alertEvent = new AlertPublishEvent(); alertEvent.setAlertId(UUID.randomUUID().toString()); alertEvent.setPolicyId(event.getPolicyId()); - if (event.getExtraData() != null) { + alertEvent.setAlertTimestamp(event.getCreatedTime()); + if (event.getExtraData() != null && !event.getExtraData().isEmpty()) { alertEvent.setSiteId(event.getExtraData().get(SITE_ID_KEY).toString()); alertEvent.setPolicyValue(event.getExtraData().get(POLICY_VALUE_KEY).toString()); alertEvent.setAppIds((List) event.getExtraData().get(APP_IDS_KEY)); @@ -104,4 +105,6 @@ public class AlertPublishEvent { alertEvent.setAlertData(event.getDataMap()); return alertEvent; } + } + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java index e502244..442c885 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java @@ -72,8 +72,8 @@ public class AlertStreamEvent extends StreamEvent { } } return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policyId=%s, createdBy=%s, metaVersion=%s]", - this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), - StringUtils.join(dataStrings, ","), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion()); + this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), + StringUtils.join(dataStrings, ","), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion()); } public String getCreatedBy() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java index bbb6ca1..3264e61 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java @@ -103,8 +103,8 @@ public abstract class AbstractMetadataChangeNotifyService implements IMetadataCh alertPublishSpecListeners.forEach(s -> s.onAlertPublishSpecChange(alertPublishSpec, sds)); } - protected void notifyAlertPublishBolt(Map pds) { - alertPublishSpecListeners.forEach(s -> s.onAlertPolicyChange(pds)); + protected void notifyAlertPublishBolt(Map pds, Map sds) { + alertPublishSpecListeners.forEach(s -> s.onAlertPolicyChange(pds, sds)); } public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java index 7bf4984..8dc6fdd 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java @@ -123,7 +123,7 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS prePopulate(alertSpec, state.getPolicySnapshots()); notifyAlertBolt(alertSpec, sds); if (state.getPublishSpecs().get(topologyId) != null) { - notifyAlertPublishBolt(listToMap(state.getPolicySnapshots())); + notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()), sds); } } break; @@ -134,7 +134,7 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS } else { notifyAlertPublishBolt(pubSpec, sds); if (state.getAlertSpecs().get(topologyId) != null) { - notifyAlertPublishBolt(listToMap(state.getPolicySnapshots())); + notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()), sds); } } break; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java index d8429ca..b5401cb 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java @@ -25,5 +25,5 @@ import java.util.Map; public interface AlertPublishSpecListener { void onAlertPublishSpecChange(PublishSpec spec, Map sds); - void onAlertPolicyChange(Map pds); + void onAlertPolicyChange(Map pds, Map sds); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java index b9b5f2e..f517078 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java @@ -78,4 +78,4 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin { return null; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index f0ad0ae..95f2a8f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -118,8 +118,9 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli } @Override - public void onAlertPolicyChange(Map pds) { + public void onAlertPolicyChange(Map pds, Map sds) { this.policyDefinitionMap = pds; + this.streamDefinitionMap = sds; } private void wrapAlertPublishEvent(AlertStreamEvent event) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java index f5b34b8..05455f0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java @@ -17,7 +17,6 @@ package org.apache.eagle.service.metadata.resource; import com.google.common.base.Preconditions; -import javafx.scene.control.Alert; import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.ScheduleState; @@ -33,7 +32,6 @@ import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; import com.google.inject.Inject; -import org.apache.storm.zookeeper.Op; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -501,6 +499,12 @@ public class MetadataResource { return dao.listAlertPublishEvent(size); } + @Path("/alerts/{alertId}") + @GET + public AlertPublishEvent getAlertPublishEvent(@PathParam("alertId") String alertId) { + return dao.getAlertPublishEvent(alertId); + } + @Path("/topologies/{topologyName}") @DELETE public OpResult removeTopology(@PathParam("topologyName") String topologyName) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java index d25c548..33a3c39 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java @@ -80,6 +80,8 @@ public interface IMetadataDao extends Closeable { List listAlertPublishEvent(int size); + AlertPublishEvent getAlertPublishEvent(String alertId); + OpResult addAlertPublishEvent(AlertPublishEvent event); ScheduleState getScheduleState(String versionId); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java index 7015a53..841a0a0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java @@ -23,6 +23,7 @@ import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; import org.apache.eagle.alert.engine.coordinator.PublishmentType; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import com.typesafe.config.Config; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,9 @@ public class MetadataUtils { if (t instanceof ScheduleState) { return ((ScheduleState) t).getVersion(); } + if (t instanceof AlertPublishEvent) { + return ((AlertPublishEvent) t).getAlertId(); + } try { Method m = t.getClass().getMethod("getName"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java index 0de13f1..ad7d353 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java @@ -207,6 +207,16 @@ public class InMemMetadataDaoImpl implements IMetadataDao { } @Override + public AlertPublishEvent getAlertPublishEvent(String alertId) { + Optional op = alerts.stream().filter(alert -> alert.getAlertId().equals(alertId)).findAny(); + if (op.isPresent()) { + return op.get(); + } else { + return null; + } + } + + @Override public OpResult addAlertPublishEvent(AlertPublishEvent event) { alerts.add(event); OpResult result = new OpResult(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java index e484e09..d9580f1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java @@ -42,8 +42,11 @@ public class JdbcDatabaseHandler { private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE id=?"; private static final String UPDATE_STATEMENT = "UPDATE %s set value=? WHERE id=?"; private static final String QUERY_ALL_STATEMENT = "SELECT value FROM %s"; - private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM %s WHERE id=?"; + private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM %s WHERE id=%s"; private static final String QUERY_ORDERBY_STATEMENT = "SELECT value FROM %s ORDER BY id %s"; + private static final String QUERY_ALL_STATEMENT_WITH_SIZE = "SELECT value FROM %s limit %s"; + + public enum SortType { DESC, ASC } private Map tblNameMap = new HashMap<>(); @@ -151,58 +154,20 @@ public class JdbcDatabaseHandler { public List list(Class clz) { String tb = getTableName(clz.getSimpleName()); - List result = new LinkedList<>(); - try { - Statement statement = connection.createStatement(); - ResultSet rs = statement.executeQuery(String.format(QUERY_ALL_STATEMENT, tb)); - while (rs.next()) { - //String key = rs.getString(1); - String json = rs.getString(1); - try { - T obj = mapper.readValue(json, clz); - result.add(obj); - } catch (IOException e) { - LOG.error("deserialize config item failed!", e); - } - } - rs.close(); - statement.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - return result; + String query = String.format(QUERY_ALL_STATEMENT, tb); + return executeSelectStatement(clz, query); + } + + public List listSubset(Class clz, int size) { + String tb = getTableName(clz.getSimpleName()); + String query = String.format(QUERY_ALL_STATEMENT_WITH_SIZE, tb, size); + return executeSelectStatement(clz, query); } public T listWithFilter(String key, Class clz) { String tb = getTableName(clz.getSimpleName()); - List result = new LinkedList<>(); - PreparedStatement statement = null; - try { - statement = connection.prepareStatement(String.format(QUERY_CONDITION_STATEMENT, tb)); - statement.setString(1, key); - ResultSet rs = statement.executeQuery(); - while (rs.next()) { - //String key = rs.getString(1); - String json = rs.getString(1); - try { - T obj = mapper.readValue(json, clz); - result.add(obj); - } catch (IOException e) { - LOG.error("deserialize config item failed!", e); - } - } - rs.close(); - } catch (SQLException e) { - e.printStackTrace(); - } finally { - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - LOG.warn("Close statement failed"); - } - } - } + String query = String.format(QUERY_CONDITION_STATEMENT, tb, key); + List result = executeSelectStatement(clz, query); if (result.isEmpty()) { return null; } else { @@ -212,11 +177,21 @@ public class JdbcDatabaseHandler { public T listTop(Class clz, String sortType) { String tb = getTableName(clz.getSimpleName()); + String query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType); + List result = executeSelectStatement(clz, query); + if (result.isEmpty()) { + return null; + } else { + return result.get(0); + } + } + + public List executeSelectStatement(Class clz, String query) { + String tb = getTableName(clz.getSimpleName()); List result = new LinkedList<>(); - PreparedStatement statement = null; try { - statement = connection.prepareStatement(String.format(QUERY_ORDERBY_STATEMENT, tb, sortType)); - ResultSet rs = statement.executeQuery(); + Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery(query); while (rs.next()) { //String key = rs.getString(1); String json = rs.getString(1); @@ -228,22 +203,11 @@ public class JdbcDatabaseHandler { } } rs.close(); + statement.close(); } catch (SQLException e) { e.printStackTrace(); - } finally { - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - LOG.warn("Close statement failed"); - } - } - } - if (result.isEmpty()) { - return null; - } else { - return result.get(0); } + return result; } public OpResult remove(String clzName, String key) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java index 0b595ac..adf26d8 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java @@ -74,7 +74,12 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { @Override public List listAlertPublishEvent(int size) { - return null; + return handler.listSubset(AlertPublishEvent.class, size); + } + + @Override + public AlertPublishEvent getAlertPublishEvent(String alertId) { + return handler.listWithFilter(alertId, AlertPublishEvent.class); } @Override @@ -85,7 +90,7 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { @Override public ScheduleState getScheduleState() { - return handler.listTop(ScheduleState.class, "DESC"); + return handler.listTop(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString()); } @Override @@ -110,7 +115,7 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { @Override public OpResult addAlertPublishEvent(AlertPublishEvent event) { - return null; + return handler.addOrReplace(AlertPublishEvent.class.getSimpleName(), event); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java index ab42e7d..f8186ac 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java @@ -125,6 +125,15 @@ public class MongoMetadataDaoImpl implements IMetadataDao { publishmentType.createIndex(doc1, io1); } + alerts = db.getCollection("alerts"); + { + IndexOptions io1 = new IndexOptions().background(true).unique(true).name("alertIndex"); + BsonDocument doc1 = new BsonDocument(); + doc1.append("alertId", new BsonInt32(1)); + alerts.createIndex(doc1, io1); + } + + // below is for schedule_specs and its splitted collections BsonDocument doc1 = new BsonDocument(); IndexOptions io1 = new IndexOptions().background(true).name("versionIndex"); @@ -179,6 +188,8 @@ public class MongoMetadataDaoImpl implements IMetadataDao { filter.append("streamId", new BsonString(MetadataUtils.getKey(t))); } else if (t instanceof PublishmentType) { filter.append("type", new BsonString(MetadataUtils.getKey(t))); + } else if (t instanceof AlertPublishEvent) { + filter.append("alertId", new BsonString(MetadataUtils.getKey(t))); } else { filter.append("name", new BsonString(MetadataUtils.getKey(t))); } @@ -307,12 +318,22 @@ public class MongoMetadataDaoImpl implements IMetadataDao { @Override public List listAlertPublishEvent(int size) { + return list(alerts, AlertPublishEvent.class); + } + + @Override + public AlertPublishEvent getAlertPublishEvent(String alertId) { + List results = list(alerts, AlertPublishEvent.class); + Optional op = results.stream().filter(alert -> alert.getAlertId().equals(alertId)).findAny(); + if (op.isPresent()) { + return op.get(); + } return null; } @Override public OpResult addAlertPublishEvent(AlertPublishEvent event) { - return null; + return addOrReplace(alerts, event); } private OpResult addOne(MongoCollection collection, T t) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java index 7f08133..ff83b80 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java @@ -32,6 +32,7 @@ import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; import org.apache.eagle.alert.coordination.model.internal.StreamGroup; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl; import org.apache.eagle.alert.metadata.resource.OpResult; @@ -206,6 +207,16 @@ public class MongoImplTest { assigns = dao.listStreams(); Assert.assertEquals(0, assigns.size()); } + // alert + { + AlertPublishEvent alert = new AlertPublishEvent(); + alert.setAlertTimestamp(System.currentTimeMillis()); + alert.setAlertId(UUID.randomUUID().toString()); + OpResult result = dao.addAlertPublishEvent(alert); + Assert.assertEquals(200, result.code); + List alerts = dao.listAlertPublishEvent(2); + Assert.assertEquals(1, alerts.size()); + } } private void test_addstate() {