eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject incubator-eagle git commit: [Fix]: Add simple support for given multi scheme instead of always wrap into schemeasmultischeme
Date Mon, 19 Sep 2016 19:24:38 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 98dff2480 -> dcb2b5de1


[Fix]: Add simple support for given multi scheme instead of always wrap into schemeasmultischeme

Author: ralphsu


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/dcb2b5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/dcb2b5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/dcb2b5de

Branch: refs/heads/master
Commit: dcb2b5de1e82d27293405bb03969715f357d5521
Parents: 98dff24
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Mon Sep 19 12:23:27 2016 -0700
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Mon Sep 19 12:23:27 2016 -0700

----------------------------------------------------------------------
 .../alert/engine/spout/CorrelationSpout.java    | 29 ++++++++++++++------
 1 file changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcb2b5de/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 9c04fa4..25c0607 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -18,6 +18,16 @@
  */
 package org.apache.eagle.alert.engine.spout;
 
+import backtype.storm.spout.MultiScheme;
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
@@ -29,14 +39,6 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider
 import org.apache.eagle.alert.engine.serialization.Serializers;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.apache.eagle.alert.utils.StreamIdConversion;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.*;
@@ -331,7 +333,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
             spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime");
         }
 
-        spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName,
topic, conf));
+        spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);
         KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
         SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this,
collector, topic, spoutSpec, numOfRouterBolts, sds, this.serializer);
         wrapper.open(conf, context, collectorWrapper);
@@ -342,6 +344,15 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         return wrapper;
     }
 
+    private MultiScheme createMultiScheme(Map conf, String topic, String schemeClsName) throws
Exception {
+        Scheme scheme = SchemeBuilder.buildFromClsName(schemeClsName, topic, conf);
+        if (scheme instanceof MultiScheme) {
+            return (MultiScheme) scheme;
+        } else {
+            return new SchemeAsMultiScheme(scheme);
+        }
+    }
+
     @Override
     public StreamDefinition getStreamDefinition(String streamId) {
         return sds.get(streamId);


Mime
View raw message