eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-741: Make publishment settings both policy & stream awareness
Date Tue, 08 Nov 2016 03:46:31 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master eaab4a9e0 -> a1c5eca05


EAGLE-741: Make publishment settings both policy & stream awareness

Currently our publishment is defined policy specific, we cannot publish the alert into different
places for one policy although there are multiple output streams for this policy.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/ccc5ffb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/ccc5ffb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/ccc5ffb5

Branch: refs/heads/master
Commit: ccc5ffb528f8ffd2c750ad57d5bbb3ad5bcbdab8
Parents: ca0fae4
Author: Xiancheng Li <xiancheng.li@ebay.com>
Authored: Mon Nov 7 13:55:20 2016 +0800
Committer: Xiancheng Li <xiancheng.li@ebay.com>
Committed: Mon Nov 7 16:31:07 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/Publishment.java   |  13 ++-
 .../publisher/impl/AbstractPublishPlugin.java   |   1 -
 .../publisher/impl/AlertPublisherImpl.java      | 101 +++++++++++++------
 .../engine/router/TestAlertPublisherBolt.java   |   2 +
 .../src/test/resources/testPublishSpec.json     |   6 +-
 .../src/test/resources/testPublishSpec2.json    |   6 +-
 .../src/test/resources/testPublishSpec3.json    |  42 ++++++++
 7 files changed, 135 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index b224a4b..dbb1844 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -32,6 +32,7 @@ public class Publishment {
     private String name;
     private String type;
     private List<String> policyIds;
+    private List<String> streamIds;
     private String dedupIntervalMin;
     private List<String> dedupFields;
     private String dedupStateField;
@@ -97,6 +98,14 @@ public class Publishment {
         this.policyIds = policyIds;
     }
 
+    public List<String> getStreamIds() {
+        return streamIds;
+    }
+
+    public void setStreamIds(List<String> streamIds) {
+        this.streamIds = streamIds;
+    }
+
     public String getDedupIntervalMin() {
         return dedupIntervalMin;
     }
@@ -130,7 +139,9 @@ public class Publishment {
                 && Objects.equals(dedupFields, p.getDedupFields())
                 && Objects.equals(dedupStateField, p.getDedupStateField())
                 && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
-                && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties()));
+                && Objects.equals(policyIds, p.getPolicyIds())
+                && Objects.equals(streamIds, p.getStreamIds())
+                && properties.equals(p.getProperties()));
         }
         return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index f0f048d..f68ae52 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.codec.IEventSerializer;
 import org.apache.eagle.alert.engine.coordinator.OverrideDeduplicatorSpec;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index 87ac30f..210fd1b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,8 +17,16 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import com.typesafe.config.Config;
-import org.apache.commons.collections.ListUtils;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
@@ -27,20 +35,19 @@ import org.apache.eagle.alert.engine.publisher.AlertPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.typesafe.config.Config;
 
 @SuppressWarnings("rawtypes")
 public class AlertPublisherImpl implements AlertPublisher {
+
     private static final long serialVersionUID = 4809983246198138865L;
     private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
+
+    private static final String STREAM_NAME_DEFAULT = "default";
+
     private final String name;
 
-    private volatile Map<String, List<String>> policyPublishPluginMapping = new
ConcurrentHashMap<>(1);
+    private volatile Map<String, Set<String>> psPublishPluginMapping = new ConcurrentHashMap<>(1);
     private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
     private Config config;
     private Map conf;
@@ -74,13 +81,18 @@ public class AlertPublisherImpl implements AlertPublisher {
             LOG.warn("policyId cannot be null for event to be published");
             return;
         }
-        List<String> pubIds = policyPublishPluginMapping.get(policyId);
+        // use default stream name if specified stream publisher is not found
+        Set<String> pubIds = psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId,
event.getStreamId()));
         if (pubIds == null) {
-            LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId);
+            pubIds = psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId));
+        }
+        if (pubIds == null) {
+            LOG.warn("Policy {} Stream {} does *NOT* subscribe any publishment!", policyId,
event.getStreamId());
             return;
         }
 
         for (String pubId : pubIds) {
+            @SuppressWarnings("resource")
             AlertPublishPlugin plugin = pubId != null ? publishPluginMapping.get(pubId) :
null;
             if (plugin == null) {
                 LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId);
@@ -100,12 +112,11 @@ public class AlertPublisherImpl implements AlertPublisher {
         publishPluginMapping.values().forEach(plugin -> plugin.close());
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public synchronized void onPublishChange(List<Publishment> added,
-                                List<Publishment> removed,
-                                List<Publishment> afterModified,
-                                List<Publishment> beforeModified) {
+                                             List<Publishment> removed,
+                                             List<Publishment> afterModified,
+                                             List<Publishment> beforeModified) {
         if (added == null) {
             added = new ArrayList<>();
         }
@@ -125,7 +136,7 @@ public class AlertPublisherImpl implements AlertPublisher {
         }
 
         // copy and swap to avoid concurrency issue
-        Map<String, List<String>> newPolicyPublishPluginMapping = new HashMap<>(policyPublishPluginMapping);
+        Map<String, Set<String>> newPSPublishPluginMapping = new HashMap<>(psPublishPluginMapping);
         Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
 
         // added
@@ -135,7 +146,7 @@ public class AlertPublisherImpl implements AlertPublisher {
             AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment,
config, conf);
             if (plugin != null) {
                 newPublishMap.put(publishment.getName(), plugin);
-                addPublishmentPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(),
publishment.getName());
+                addPublishmentPoliciesStreams(newPSPublishPluginMapping, publishment.getPolicyIds(),
publishment.getStreamIds(), publishment.getName());
             } else {
                 LOG.error("OnPublishChange alertPublisher {} failed due to invalid format",
publishment);
             }
@@ -144,7 +155,7 @@ public class AlertPublisherImpl implements AlertPublisher {
         List<AlertPublishPlugin> toBeClosed = new ArrayList<>();
         for (Publishment publishment : removed) {
             String pubName = publishment.getName();
-            removePublihsPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(),
pubName);
+            removePublihsPoliciesStreams(newPSPublishPluginMapping, publishment.getPolicyIds(),
pubName);
             toBeClosed.add(newPublishMap.get(pubName));
             newPublishMap.remove(publishment.getName());
         }
@@ -152,13 +163,14 @@ public class AlertPublisherImpl implements AlertPublisher {
         for (int i = 0; i < afterModified.size(); i++) {
             String pubName = afterModified.get(i).getName();
             List<String> newPolicies = afterModified.get(i).getPolicyIds();
+            List<String> newStreams = afterModified.get(i).getStreamIds();
             List<String> oldPolicies = beforeModified.get(i).getPolicyIds();
+            List<String> oldStreams = beforeModified.get(i).getStreamIds();
 
-            if (!newPolicies.equals(oldPolicies)) {
-                List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies);
-                removePublihsPolicies(newPolicyPublishPluginMapping, deletedPolicies, pubName);
-                List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies);
-                addPublishmentPolicies(newPolicyPublishPluginMapping, addedPolicies, pubName);
+            if (!newPolicies.equals(oldPolicies) || !newStreams.equals(oldStreams)) {
+                // since both policy & stream may change, skip the compare and difference
update
+                removePublihsPoliciesStreams(newPSPublishPluginMapping, oldPolicies, pubName);
+                addPublishmentPoliciesStreams(newPSPublishPluginMapping, newPolicies, newStreams,
pubName);
             }
             Publishment newPub = afterModified.get(i);
             newPublishMap.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
@@ -166,7 +178,7 @@ public class AlertPublisherImpl implements AlertPublisher {
 
         // now do the swap
         publishPluginMapping = newPublishMap;
-        policyPublishPluginMapping = newPolicyPublishPluginMapping;
+        psPublishPluginMapping = newPSPublishPluginMapping;
 
         // safely close : it depend on plugin to check if want to wait all data to be flushed.
         closePlugins(toBeClosed);
@@ -182,26 +194,51 @@ public class AlertPublisherImpl implements AlertPublisher {
         }
     }
 
-    private void addPublishmentPolicies(Map<String, List<String>> newPolicyPublishPluginMapping,
List<String> addedPolicyIds, String pubName) {
+    private void addPublishmentPoliciesStreams(Map<String, Set<String>> newPSPublishPluginMapping,
+                                               List<String> addedPolicyIds, List<String>
addedStreamIds, String pubName) {
         if (addedPolicyIds == null || pubName == null) {
             return;
         }
 
+        if (addedStreamIds == null || addedStreamIds.size() <= 0) {
+            addedStreamIds = new ArrayList<String>();
+            addedStreamIds.add(STREAM_NAME_DEFAULT);
+        }
+
         for (String policyId : addedPolicyIds) {
-            newPolicyPublishPluginMapping.putIfAbsent(policyId, new ArrayList<>());
-            newPolicyPublishPluginMapping.get(policyId).add(pubName);
+            for (String streamId : addedStreamIds) {
+                String psUniqueId = getPolicyStreamUniqueId(policyId, streamId);
+                newPSPublishPluginMapping.putIfAbsent(psUniqueId, new HashSet<>());
+                newPSPublishPluginMapping.get(psUniqueId).add(pubName);
+            }
         }
     }
 
-    private synchronized void removePublihsPolicies(Map<String, List<String>>
newPolicyPublishPluginMapping, List<String> deletedPolicyIds, String pubName) {
+    private synchronized void removePublihsPoliciesStreams(Map<String, Set<String>>
newPSPublishPluginMapping,
+                                                           List<String> deletedPolicyIds,
String pubName) {
         if (deletedPolicyIds == null || pubName == null) {
             return;
         }
 
         for (String policyId : deletedPolicyIds) {
-            List<String> publishIds = newPolicyPublishPluginMapping.get(policyId);
-            publishIds.remove(pubName);
+            for (Entry<String, Set<String>> entry : newPSPublishPluginMapping.entrySet())
{
+                if (entry.getKey().startsWith("policyId:" + policyId)) {
+                    entry.getValue().remove(pubName);
+                    break;
+                }
+            }
+        }
+    }
+
+    private String getPolicyStreamUniqueId(String policyId) {
+        return getPolicyStreamUniqueId(policyId, STREAM_NAME_DEFAULT);
+    }
+
+    private String getPolicyStreamUniqueId(String policyId, String streamId) {
+        if (StringUtils.isBlank(streamId)) {
+            streamId = STREAM_NAME_DEFAULT;
         }
+        return String.format("policyId:%s,streamId:%s", policyId, streamId);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 5cdb6f1..c95cab1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -136,6 +136,7 @@ public class TestAlertPublisherBolt {
     public void testMapComparator() {
         PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"),
PublishSpec.class);
         PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"),
PublishSpec.class);
+        PublishSpec spec3 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec3.json"),
PublishSpec.class);
         Map<String, Publishment> map1 = new HashMap<>();
         Map<String, Publishment> map2 = new HashMap<>();
         spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
@@ -148,6 +149,7 @@ public class TestAlertPublisherBolt {
         AlertPublisherBolt publisherBolt = new AlertPublisherBolt("alert-publisher-test",
null, null);
         publisherBolt.onAlertPublishSpecChange(spec1, null);
         publisherBolt.onAlertPublishSpecChange(spec2, null);
+        publisherBolt.onAlertPublishSpecChange(spec3, null);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
index 70ea6b3..a8f4105 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json
@@ -11,6 +11,9 @@
         "policy2",
         "policy3"
       ],
+      "streamIds": [
+        "stream1"
+      ],
       "dedupIntervalMin": "PT1M",
       "properties": {
         "subject": "Test Alert",
@@ -24,7 +27,8 @@
         "mail.debug": "false",
         "mail.connection": "tls",
         "mail.smtp.port": "587"
-      }
+      },
+      "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
     }
     /*    {
           "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
index e14db43..e31e1f4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json
@@ -9,6 +9,9 @@
       "policyIds": [
         "policy1"
       ],
+      "streamIds": [
+        "stream1"
+      ],
       "dedupIntervalMin": "PT2M",
       "properties": {
         "subject": "Test Alert",
@@ -22,7 +25,8 @@
         "mail.debug": "false",
         "mail.connection": "tls",
         "mail.smtp.port": "587"
-      }
+      },
+      "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
     }
     //    {
     //      "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json
new file mode 100644
index 0000000..0bf0e2a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json
@@ -0,0 +1,42 @@
+{
+  "version": "version1",
+  "topologyName": "testTopology",
+  "boltId": "alertPublishBolt",
+  "publishments": [
+    {
+      "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+      "name": "email-testAlertStream",
+      "policyIds": [
+        "policy1"
+      ],
+      "streamIds": [
+        "stream2"
+      ],
+      "dedupIntervalMin": "PT2M",
+      "properties": {
+        "subject": "Test Alert",
+        "template": "",
+        "sender": "sender@corp.com",
+        "recipients": "receiver@corp.com",
+        "mail.smtp.host": "smtp.mailhost.com",
+        "mail.smtp.auth": "true",
+        "mail.username": "username",
+        "mail.password": "password",
+        "mail.debug": "false",
+        "mail.connection": "tls",
+        "mail.smtp.port": "587"
+      },
+      "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
+    }
+    //    {
+    //      "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    //      "name":"kafka-testAlertStream",
+    //      "policyIds": ["testPolicy"],
+    //      "dedupIntervalMin": "PT1M",
+    //      "properties":{
+    //        "kafka_broker":"sandbox.hortonworks.com:6667",
+    //        "topic":"test_kafka"
+    //      }
+    //    }
+  ]
+}
\ No newline at end of file


Mime
View raw message