eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject eagle git commit: EAGLE-943: Return topology builder to enable flink-storm topology build.
Date Fri, 03 Mar 2017 12:27:43 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 287a1c109 -> c75eadd44


EAGLE-943: Return topology builder to enable flink-storm topology build.

Author: ralphsu

This closes #855


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/c75eadd4
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/c75eadd4
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/c75eadd4

Branch: refs/heads/master
Commit: c75eadd44d097df3e6ff830ddaebadbf7751a637
Parents: 287a1c1
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Fri Mar 3 20:14:42 2017 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Fri Mar 3 20:22:40 2017 +0800

----------------------------------------------------------------------
 .../eagle/alert/engine/UnitTopologyMain.java    | 24 ++++++++++++++++----
 .../alert/engine/runner/UnitTopologyRunner.java | 24 ++++++++++----------
 2 files changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/c75eadd4/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
index 01b16b8..055032f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
@@ -19,17 +19,20 @@
 
 package org.apache.eagle.alert.engine;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
 import org.apache.eagle.alert.config.ZKConfig;
 import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
+
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Options;
 
 
 /**
@@ -84,6 +87,19 @@ public class UnitTopologyMain {
         String topologyId = getTopologyName(config);
         ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config,
topologyId);
 
+        return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config).createTopology();
+    }
+    
+    /**
+     * Returns a builder instead of topology itself. This make it possible to run storm-flink
conversion.
+     * 
+     * @param config
+     * @return
+     */
+    public static TopologyBuilder createTopologyBuilder(Config config) {
+        String topologyId = getTopologyName(config);
+        ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config,
topologyId);
+
         return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config);
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/c75eadd4/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 287d5db..3f06f66 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -19,12 +19,19 @@
 
 package org.apache.eagle.alert.engine.runner;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
 import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.spout.CorrelationSpout;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
@@ -33,17 +40,10 @@ import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
 /**
  * By default
  * 1. one spout with multiple tasks
@@ -106,7 +106,7 @@ public class UnitTopologyRunner {
         }
 
         stormConfig.setNumWorkers(numOfTotalWorkers);
-        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts,
numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config);
+        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts,
numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config).createTopology();
 
         if (localMode) {
             LOG.info("Submitting as local mode");
@@ -143,7 +143,7 @@ public class UnitTopologyRunner {
     // Build Storm Topology
     // ---------------------------
 
-    public StormTopology buildTopology(String topologyId,
+    public TopologyBuilder buildTopology(String topologyId,
                                        int numOfSpoutTasks,
                                        int numOfRouterBolts,
                                        int numOfAlertBolts,
@@ -208,10 +208,10 @@ public class UnitTopologyRunner {
             boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0));
         }
 
-        return builder.createTopology();
+        return builder;
     }
 
-    public StormTopology buildTopology(String topologyId, Config config) {
+    public TopologyBuilder buildTopology(String topologyId, Config config) {
         int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
         int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
         int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);


Mime
View raw message