eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject eagle git commit: [EAGLE-971] fix a bug that duplicated queues are generated under a monitored stream
Date Wed, 29 Mar 2017 06:48:14 GMT
Repository: eagle
Updated Branches:
  refs/heads/master a64b622cc -> c01a258b6


[EAGLE-971] fix a bug that duplicated queues are generated under a monitored stream

https://issues.apache.org/jira/browse/EAGLE-971

New policies for alert spec generation
1. each alert bolt has no more than 'coordinator.policiesPerBolt' policies.
2. each alert bolt has no more than 'coordinator.streamsPerBolt' queues if 'reuseBoltInStreams'
is true
3. NO queues on one alert bolt have the same StreamGroup.

Author: Zhao, Qingwen <qingwzhao@apache.org>

Closes #895 from qingwen220/EAGLE-971.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/c01a258b
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/c01a258b
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/c01a258b

Branch: refs/heads/master
Commit: c01a258b6f9b8fa0ca340b605e7ef942b95c3afd
Parents: a64b622
Author: Zhao, Qingwen <qingwzhao@apache.org>
Authored: Wed Mar 29 14:48:09 2017 +0800
Committer: Zhao, Qingwen <qingwzhao@apache.org>
Committed: Wed Mar 29 14:48:09 2017 +0800

----------------------------------------------------------------------
 eagle-assembly/src/main/conf/eagle.conf           |  6 ++++--
 .../alert/coordinator/TopologyMgmtService.java    |  2 +-
 .../impl/strategies/SameTopologySlotStrategy.java | 18 +++++++++++-------
 .../alert/engine/spout/CorrelationSpout.java      |  7 ++++++-
 4 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/c01a258b/eagle-assembly/src/main/conf/eagle.conf
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/conf/eagle.conf b/eagle-assembly/src/main/conf/eagle.conf
index 2912682..496e3d0 100644
--- a/eagle-assembly/src/main/conf/eagle.conf
+++ b/eagle-assembly/src/main/conf/eagle.conf
@@ -142,12 +142,14 @@ application {
 
 # Coordinator Configuration
 coordinator {
-  policiesPerBolt = 5
-  boltParallelism = 5
+#  boltParallelism = 5
   policyDefaultParallelism = 5
   boltLoadUpbound = 0.8
   topologyLoadUpbound = 0.8
   numOfAlertBoltsPerTopology = 5
+  policiesPerBolt = 10
+  streamsPerBolt = 10
+  reuseBoltInStreams = true
   zkConfig {
     zkQuorum = "server.eagle.apache.org:2181"
     zkRoot = "/alert"

http://git-wip-us.apache.org/repos/asf/eagle/blob/c01a258b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
index 4ca9d5e..4ede29d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
@@ -56,7 +56,7 @@ public class TopologyMgmtService {
 
     public TopologyMgmtService() {
         Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
-        boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM);
+        //boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM);
         numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY);
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/c01a258b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
index 0e5fd00..e401e98 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
@@ -29,6 +29,7 @@ import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
 import org.apache.eagle.alert.coordinator.model.TopologyUsage;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,7 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
     private final StreamGroup partitionGroup;
     private final TopologyMgmtService mgmtService;
 
-    //    private final int numOfPoliciesBoundPerBolt;
+    private final int numOfPoliciesBoundPerBolt;
     private final double topoLoadUpbound;
     private final boolean reuseBoltInStreams;
     private final int streamsPerBolt;
@@ -62,7 +63,7 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
         this.mgmtService = mgmtService;
 
         Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
-        // numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
+        numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
         topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
         if (config.hasPath(CoordinatorConstants.REUSE_BOLT_IN_STREAMS)) {
             reuseBoltInStreams = config.getBoolean(CoordinatorConstants.REUSE_BOLT_IN_STREAMS);
@@ -162,12 +163,15 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
         if (!reuseBoltInStreams && alertUsage.getQueueSize() > 0) {
             return false;
         }
-        if (reuseBoltInStreams && alertUsage.getQueueSize() >= streamsPerBolt)
{
-            return false;
+        if (reuseBoltInStreams) {
+            if (alertUsage.getQueueSize() >= streamsPerBolt) {
+                return false;
+            }
+            if (alertUsage.getPartitions().contains(partitionGroup)) {
+                return false;
+            }
         }
-        // actually it's now 0;
-        return true;
-        //  return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
+        return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c01a258b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 60a9b98..e9ee892 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -168,7 +168,12 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
     @Override
     public void nextTuple() {
         for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
-            wrapper.nextTuple();
+            try {
+                wrapper.nextTuple();
+            } catch (Exception e) {
+                LOG.error("unexpected exception is caught: {}", e.getMessage(), e);
+            }
+
         }
     }
 


Mime
View raw message