eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject incubator-eagle git commit: EAGLE-758: add switch to enable and disable event log
Date Wed, 09 Nov 2016 11:08:31 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 4df63def8 -> 9a5cdb26d


EAGLE-758: add switch to enable and disable event log

Author: Li, Garrett
Reviewer: ralphsu

This closes #633


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9a5cdb26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9a5cdb26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9a5cdb26

Branch: refs/heads/master
Commit: 9a5cdb26d6db15e05a21cc1d6c70e94aded181d7
Parents: 4df63de
Author: Xiancheng Li <xiancheng.li@ebay.com>
Authored: Wed Nov 9 16:59:28 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Wed Nov 9 19:10:24 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/engine/runner/AlertBolt.java       | 13 ++++++++++---
 .../eagle/alert/engine/spout/CorrelationSpout.java | 17 ++++++++++-------
 .../engine/spout/SpoutOutputCollectorWrapper.java  |  8 ++++++--
 3 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9a5cdb26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index bdd9a99..639d338 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -63,6 +63,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     private PolicyGroupEvaluator policyGroupEvaluator;
     private AlertStreamCollector alertOutputCollector;
     private String boltId;
+    private boolean logEventEnabled;
     private volatile Object outputLock;
     // mapping from policy name to PolicyDefinition
     private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>();
// for one streamGroup, there are multiple policies
@@ -74,6 +75,10 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         this.boltId = boltId;
         this.policyGroupEvaluator = new PolicyGroupEvaluatorImpl(boltId + "-evaluator_stage1");
// use bolt id as evaluatorId.
         // TODO next stage evaluator
+
+        if (config.hasPath("topology.logEventEnabled")) {
+            logEventEnabled = config.getBoolean("topology.logEventEnabled");
+        }
     }
 
     @Override
@@ -81,7 +86,9 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         this.streamContext.counter().scope("execute_count").incr();
         try {
             PartitionedEvent pe = deserialize(input.getValueByField(AlertConstants.FIELD_0));
-            LOG.info("Alert bolt {} received event: {}", boltId, pe.getEvent());
+            if (logEventEnabled) {
+                LOG.info("Alert bolt {} received event: {}", boltId, pe.getEvent());
+            }
             String streamEventVersion = pe.getEvent().getMetaVersion();
 
             if (streamEventVersion == null) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9a5cdb26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 67074ce..63e94ca 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -48,7 +48,6 @@ import java.util.*;
 
 /**
  * wrap KafkaSpout to provide parallel processing of messages for multiple Kafka topics
- *
  * <p>1. onNewConfig() is interface for outside to update new metadata. Upon new metadata,
this class will calculate if there is any new topic, removed topic or
  * updated topic</p>
  */
@@ -183,7 +182,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         // decode and get topic
         KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
         KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        if (spout !=  null) {
+        if (spout != null) {
             spout.ack(id.id);
         }
     }
@@ -194,7 +193,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
         LOG.error("Failing message {}, with topic {}", msgId, id.topic);
         KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        if (spout !=  null) {
+        if (spout != null) {
             spout.fail(id.id);
         }
     }
@@ -287,8 +286,8 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
      * consumerId by default is EagleConsumer unless it is specified by "stormKafkaEagleConsumer"
      * Note2: put topologyId as part of zkState because one topic by design can be consumed
by multiple topologies so one topology needs to know
      * processed offset for itself
-     *
      * <p>TODO: Should avoid use Config.get in deep calling stack, should generate
config bean as early as possible
+     * </p>
      *
      * @param conf
      * @param context
@@ -311,6 +310,10 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         if (config.hasPath("spout.stormKafkaTransactionZkPath")) {
             transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath");
         }
+        boolean logEventEnabled = false;
+        if (config.hasPath("topology.logEventEnabled")) {
+            logEventEnabled = config.getBoolean("topology.logEventEnabled");
+        }
         // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath
         String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
         if (config.hasPath("spout.stormKafkaEagleConsumer")) {
@@ -339,7 +342,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
 
         spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);
         KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
-        SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this,
collector, topic, spoutSpec, numOfRouterBolts, sds, this.serializer);
+        SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this,
collector, topic, spoutSpec, numOfRouterBolts, sds, this.serializer, logEventEnabled);
         wrapper.open(conf, context, collectorWrapper);
 
         if (LOG.isInfoEnabled()) {
@@ -352,8 +355,8 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         Object scheme = SchemeBuilder.buildFromClsName(schemeClsName, topic, conf);
         if (scheme instanceof MultiScheme) {
             return (MultiScheme) scheme;
-        } else if (scheme instanceof  Scheme) {
-            return new SchemeAsMultiScheme((Scheme)scheme);
+        } else if (scheme instanceof Scheme) {
+            return new SchemeAsMultiScheme((Scheme) scheme);
         } else {
             LOG.error("create spout scheme failed.");
             throw new IllegalArgumentException("create spout scheme failed.");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9a5cdb26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index b75814e..1036a36 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -50,6 +50,7 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
     private final String topic;
     private final PartitionedEventSerializer serializer;
     private int numOfRouterBolts;
+    private boolean logEventEnabled;
 
     private volatile List<StreamRepartitionMetadata> streamRepartitionMetadataList;
     private volatile Tuple2StreamConverter converter;
@@ -67,7 +68,7 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
                                        String topic,
                                        SpoutSpec spoutSpec,
                                        int numGroupbyBolts,
-                                       Map<String, StreamDefinition> sds, PartitionedEventSerializer
serializer) {
+                                       Map<String, StreamDefinition> sds, PartitionedEventSerializer
serializer, boolean logEventEnabled) {
         super(delegate);
         this.spout = spout;
         this.delegate = delegate;
@@ -77,6 +78,7 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
         this.numOfRouterBolts = numGroupbyBolts;
         this.sds = sds;
         this.serializer = serializer;
+        this.logEventEnabled = logEventEnabled;
     }
 
     /**
@@ -118,7 +120,9 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector
implements
         }
 
         StreamEvent event = convertToStreamEventByStreamDefinition((Long) convertedTuple.get(2),
m, sds.get(streamId));
-        LOG.info("Spout from topic {} emit event: {}", topic, event);
+        if (logEventEnabled) {
+            LOG.info("Spout from topic {} emit event: {}", topic, event);
+        }
         
         /*
             phase 2: stream repartition


Mime
View raw message