gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/4] incubator-gobblin git commit: [GOBBLIN-3] Multi-hop flow compiler implementation
Date Tue, 12 Sep 2017 09:30:01 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ea5047ea2 -> 9402a9037


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
new file mode 100644
index 0000000..456f3a3
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import com.google.common.base.Splitter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.modules.policy.ServicePolicy;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.jgrapht.graph.DirectedWeightedMultigraph;
+import org.slf4j.Logger;
+import org.apache.gobblin.runtime.api.FlowEdge;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.gobblin.service.ServiceConfigKeys.*;
+import static org.apache.gobblin.service.modules.utils.FindPathUtils.*;
+
+// Users are capable to inject hints/prioritization into route selection, in two forms:
+// 1. PolicyBasedBlockedConnection: Define some undesired routes
+// 2. Specified a complete path. FlowCompiler is responsible to verify if the path given is valid.
+
+// TODO: Flow monitoring, injecting weight for flowEdge:ETL-6213
+@Slf4j
+public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
+
+  private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+
+  @Getter
+  private DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph =
+      new DirectedWeightedMultigraph<>(LoadBasedFlowEdgeImpl.class);
+
+  public ServicePolicy servicePolicy;
+
+  // Contains user-specified complete path of how the data movement is executed from source to sink.
+  private Optional<String> optionalUserSpecifiedPath;
+
+  private FlowEdgeProps defaultFlowEdgeProps = new FlowEdgeProps();
+
+  public MultiHopsFlowToJobSpecCompiler(Config config) {
+    this(config, Optional.absent(), true);
+  }
+
+  public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> log) {
+    this(config, log, true);
+  }
+
+  public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
+    super(config, log, instrumentationEnabled);
+    String policyClassName = config.hasPath(SERVICE_POLICY_NAME) ? config.getString(SERVICE_POLICY_NAME)
+        : ServiceConfigKeys.DEFAULT_SERVICE_POLICY;
+    ClassAliasResolver<ServicePolicy> classResolver = new ClassAliasResolver<>(ServicePolicy.class);
+    try {
+      servicePolicy = classResolver.resolveClass(policyClassName).newInstance();
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException("Error happen when resolving class for :" + policyClassName, e);
+    }
+
+    if (config.hasPath(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION)
+        && config.getStringList(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION).size() > 0) {
+      try {
+        for (String sourceSinkPair : config.getStringList(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION)) {
+          BaseServiceNodeImpl source = new BaseServiceNodeImpl(sourceSinkPair.split(":")[0]);
+          BaseServiceNodeImpl sink = new BaseServiceNodeImpl(sourceSinkPair.split(":")[1]);
+          URI specExecutorURI = new URI(sourceSinkPair.split(":")[2]);
+          servicePolicy.addFlowEdge(
+              new LoadBasedFlowEdgeImpl(source, sink, InMemorySpecExecutor.createDummySpecExecutor(specExecutorURI)));
+        }
+      } catch (URISyntaxException e) {
+        this.log.warn("Constructing of FlowEdge in ServicePolicy Failed");
+      }
+    }
+
+    if (config.hasPath(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES) &&
+        StringUtils.isNotBlank(config.getString(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES))) {
+      for (String blacklistedNode : SPLIT_BY_COMMA.splitToList(
+          config.getString(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES))) {
+        servicePolicy.addServiceNode(new BaseServiceNodeImpl(blacklistedNode));
+      }
+    }
+
+    if (config.hasPath(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH) && StringUtils.isNotBlank(
+        config.getString(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH))) {
+      optionalUserSpecifiedPath = Optional.of(config.getString(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH));
+    } else {
+      optionalUserSpecifiedPath = Optional.absent();
+    }
+  }
+
+  @Override
+  public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
+    // A Map from JobSpec to SpexExecutor, as the output of Flow Compiler.
+    Map<Spec, SpecExecutor> specExecutorInstanceMap = Maps.newLinkedHashMap();
+    findPath(specExecutorInstanceMap, spec);
+    return specExecutorInstanceMap;
+  }
+
+  /**
+   * @return Transform a set of {@link TopologySpec} into a instance of {@link org.jgrapht.graph.WeightedMultigraph}
+   * and filter out connections between blacklisted vertices that user specified.
+   * The output of this function only stays in memory, so each time a logical flow is compiled, the multigraph will
+   * be re-calculated.
+   *
+   */
+  private void inMemoryWeightGraphGenerator() {
+    for (TopologySpec topologySpec : topologySpecMap.values()) {
+      weightGraphGenerateHelper(topologySpec);
+    }
+
+    // Filter out connection appearing in servicePolicy.
+    // This is where servicePolicy is enforced.
+    servicePolicy.populateBlackListedEdges(this.weightedGraph);
+    if (servicePolicy.getBlacklistedEdges().size() > 0) {
+      for (FlowEdge toDeletedEdge : servicePolicy.getBlacklistedEdges()) {
+        weightedGraph.removeEdge(toDeletedEdge);
+      }
+    }
+  }
+
+  // Basically a dijkstra path finding for connecting source and sink by multiple hops in between.
+  // If there's any user-specified prioritization, conduct the DFS and see if the user-specified path is available.
+
+  // there's no updates on TopologySpec, or user should be aware of the possibility
+  // that a topologySpec not being reflected in findPath.
+  private void findPath(Map<Spec, SpecExecutor> specExecutorInstanceMap, Spec spec) {
+    inMemoryWeightGraphGenerator();
+    FlowSpec flowSpec = (FlowSpec) spec;
+    if (optionalUserSpecifiedPath.isPresent()) {
+      log.info("Starting to evaluate user's specified path ... ");
+      if (userSpecifiedPathVerificator(specExecutorInstanceMap, flowSpec)) {
+        log.info("User specified path[ " + optionalUserSpecifiedPath.get() + "] successfully verified.");
+        return;
+      } else {
+        log.error("Will not execute user specified path[ " + optionalUserSpecifiedPath.get() + "]");
+        log.info("Start to execute FlowCompiler's algorithm for valid data movement path");
+      }
+    }
+
+    ServiceNode sourceNode =
+        new BaseServiceNodeImpl(flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY));
+
+    ServiceNode targetNode =
+        new BaseServiceNodeImpl(flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY));
+
+    List<FlowEdge> resultEdgePath = dijkstraBasedPathFindingHelper(sourceNode, targetNode, this.weightedGraph);
+    for (int i = 0; i < resultEdgePath.size() ; i++) {
+      FlowEdge tmpFlowEdge = resultEdgePath.get(i);
+      ServiceNode edgeSrcNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getSourceNode();
+      ServiceNode edgeTgtNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getTargetNode();
+      specExecutorInstanceMap.put(jobSpecGenerator(edgeSrcNode, edgeTgtNode, flowSpec),
+          ((LoadBasedFlowEdgeImpl) (resultEdgePath.get(i))).getSpecExecutorInstance());
+    }
+  }
+
+  /**
+   * As the base implementation here, all templates will be considered for each edge.
+   */
+  @Override
+  protected void populateEdgeTemplateMap() {
+    if (templateCatalog.isPresent()) {
+      for (FlowEdge flowEdge : this.weightedGraph.edgeSet()) {
+        edgeTemplateMap.put(flowEdge.getEdgeIdentity(), templateCatalog.get().
+            getAllTemplates().
+            stream().map(jobTemplate -> jobTemplate.getUri()).collect(Collectors.toList()));
+      }
+    }
+  }
+
+  // If path specified not existed, return false;
+  // else return true.
+  private boolean userSpecifiedPathVerificator(Map<Spec, SpecExecutor> specExecutorInstanceMap, FlowSpec flowSpec) {
+    Map<Spec, SpecExecutor> tmpSpecExecutorInstanceMap = new HashMap<>();
+    List<String> userSpecfiedPath = Arrays.asList(optionalUserSpecifiedPath.get().split(","));
+    for (int i = 0; i < userSpecfiedPath.size() - 1; i++) {
+      ServiceNode sourceNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i));
+      ServiceNode targetNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i + 1));
+      if (weightedGraph.containsVertex(sourceNode) && weightedGraph.containsVertex(targetNode)
+          && weightedGraph.containsEdge(sourceNode, targetNode)) {
+        tmpSpecExecutorInstanceMap.put(jobSpecGenerator(sourceNode, targetNode, flowSpec),
+            (((LoadBasedFlowEdgeImpl) weightedGraph.getEdge(sourceNode, targetNode)).getSpecExecutorInstance()));
+      } else {
+        log.error("User Specified Path is invalid");
+        return false;
+      }
+    }
+    specExecutorInstanceMap.putAll(tmpSpecExecutorInstanceMap);
+    return true;
+  }
+
+  // Helper function for transform TopologySpecMap into a weightedDirectedGraph.
+  private void weightGraphGenerateHelper(TopologySpec topologySpec) {
+    try {
+      Map<ServiceNode, ServiceNode> capabilities = topologySpec.getSpecExecutor().getCapabilities().get();
+      for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) {
+
+        BaseServiceNodeImpl sourceNode = new BaseServiceNodeImpl(capability.getKey().getNodeName());
+        BaseServiceNodeImpl targetNode = new BaseServiceNodeImpl(capability.getValue().getNodeName());
+
+        if (!weightedGraph.containsVertex(sourceNode)) {
+          weightedGraph.addVertex(sourceNode);
+        }
+        if (!weightedGraph.containsVertex(targetNode)) {
+          weightedGraph.addVertex(targetNode);
+        }
+
+        FlowEdge flowEdge =
+            new LoadBasedFlowEdgeImpl(sourceNode, targetNode, defaultFlowEdgeProps, topologySpec.getSpecExecutor());
+
+        // In Multi-Graph if flowEdge existed, just skip it.
+        if (!weightedGraph.containsEdge(flowEdge)) {
+          weightedGraph.addEdge(sourceNode, targetNode, flowEdge);
+        }
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      Instrumented.markMeter(this.flowCompilationFailedMeter);
+      throw new RuntimeException("Cannot determine topology capabilities", e);
+    }
+  }
+
+  /**
+   * Generate JobSpec based on the #templateURI that user specified.
+   */
+  private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowEdge flowEdge, URI templateURI,
+      FlowSpec flowSpec) {
+    JobSpec jobSpec;
+    JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowSpec, sourceNode, targetNode))
+        .withConfig(flowSpec.getConfig())
+        .withDescription(flowSpec.getDescription())
+        .withVersion(flowSpec.getVersion());
+    if (edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity()) && edgeTemplateMap.get(flowEdge.getEdgeIdentity())
+        .contains(templateURI)) {
+      jobSpecBuilder.withTemplate(templateURI);
+      try {
+        jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get());
+        log.info("Resolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
+      } catch (SpecNotFoundException | JobTemplate.TemplateException e) {
+        throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e);
+      }
+    } else {
+      jobSpec = jobSpecBuilder.build();
+      log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
+    }
+    return jobSpec;
+  }
+
+  /**
+   * A naive implementation of resolving templates in each JobSpec among Multi-hop FlowSpec.
+   * Handle the case when edge is not specified.
+   * Always select the first available template.
+   */
+  private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec) {
+    FlowEdge flowEdge = weightedGraph.getAllEdges(sourceNode, targetNode).iterator().next();
+    URI firstTemplateURI =
+        (edgeTemplateMap != null && edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get(
+            flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getUri();
+    return this.jobSpecGenerator(sourceNode, targetNode, flowEdge, firstTemplateURI, flowSpec);
+  }
+
+  /**
+   * A naive implementation of generating a jobSpec's URI within a multi-hop logical Flow.
+   */
+  public static URI jobSpecURIGenerator(FlowSpec flowSpec, ServiceNode sourceNode, ServiceNode targetNode) {
+    try {
+      return new URI(flowSpec.getUri().getScheme(), flowSpec.getUri().getAuthority(),
+          "/" + sourceNode.getNodeName() + "-" + targetNode.getNodeName(), null);
+    } catch (URISyntaxException e) {
+      log.error(
+          "URI construction failed when jobSpec from " + sourceNode.getNodeName() + " to " + targetNode.getNodeName());
+      throw new RuntimeException();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 65deeec..261ce6e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -22,12 +22,8 @@ import java.net.URI;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
-import lombok.Getter;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.slf4j.Logger;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
@@ -40,10 +36,8 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
-
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecCompiler;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
@@ -54,6 +48,13 @@ import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.slf4j.LoggerFactory;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.configuration.State;
+import org.slf4j.Logger;
+
+import lombok.Getter;
 
 
 /**
@@ -179,7 +180,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   public void orchestrate(Spec spec) throws Exception {
     long startTime = System.nanoTime();
     if (spec instanceof FlowSpec) {
-      Map<Spec, SpecExecutorInstanceProducer> specExecutorInstanceMap = specCompiler.compileFlow(spec);
+      Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec);
 
       if (specExecutorInstanceMap.isEmpty()) {
         _log.warn("Cannot determine an executor to run on for Spec: " + spec);
@@ -187,18 +188,18 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       }
 
       // Schedule all compiled JobSpecs on their respective Executor
-      for (Map.Entry<Spec, SpecExecutorInstanceProducer> specsToExecute : specExecutorInstanceMap.entrySet()) {
+      for (Map.Entry<Spec, SpecExecutor> specsToExecute : specExecutorInstanceMap.entrySet()) {
         // Run this spec on selected executor
-        SpecExecutorInstanceProducer producer = null;
+        SpecProducer producer = null;
         try {
-          producer = specsToExecute.getValue();
+          producer = specsToExecute.getValue().getProducer().get();
           Spec jobSpec = specsToExecute.getKey();
 
           _log.info(String.format("Going to orchestrate JobSpc: %s on Executor: %s", jobSpec, producer));
           producer.addSpec(jobSpec);
         } catch(Exception e) {
           _log.error("Cannot successfully setup spec: " + specsToExecute.getKey() + " on executor: " + producer +
-            " for flow: " + spec, e);
+              " for flow: " + spec, e);
         }
       }
     } else {
@@ -221,7 +222,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   }
 
   @Override
-  public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
+  public List<Tag<?>> generateTags(State state) {
     return Collections.emptyList();
   }
 
@@ -234,4 +235,4 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
   public void switchMetricContext(MetricContext context) {
     throw new UnsupportedOperationException();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
new file mode 100644
index 0000000..540b13d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.policy;
+
+import java.util.Set;
+import org.apache.gobblin.runtime.api.FlowEdge;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.jgrapht.graph.DirectedWeightedMultigraph;
+
+
+/**
+ * ServicePolicy will be firstly checked before the compilation happen.
+ * unexpcted edges will not be considered in compilation process.
+ */
+public interface ServicePolicy {
+
+  /**
+   * After initialization of {@link ServicePolicy}, the populating method need to invoked before
+   * {@link #getBlacklistedEdges()} can return the expected result.
+   *
+   * This requirement exists because when {@link ServicePolicy} is initialized it is not necessary that a
+   * {@link org.jgrapht.graph.WeightedMultigraph} has been constructed, neither we cannot know if user-specified edges
+   * and nodes exist in {@link org.jgrapht.graph.WeightedMultigraph}.
+   * The population of blacklisted Edges make sense after graph has been constructed.
+   */
+  public void populateBlackListedEdges(DirectedWeightedMultigraph<ServiceNode, FlowEdge> graph);
+
+  /**
+   * Should return all edges that being blacklisted by this policy.
+   */
+  public Set<FlowEdge> getBlacklistedEdges();
+
+  public void addServiceNode(ServiceNode serviceNode);
+
+  public void addFlowEdge(FlowEdge flowEdge);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
new file mode 100644
index 0000000..aabc67e
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.policy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.runtime.api.FlowEdge;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.jgrapht.graph.DirectedWeightedMultigraph;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Defines {@link ServiceNode}s or {@link FlowEdge}s that should be blacklisted from the Flow-compiler process,
+ * obtained only from configuration, which is the reason it is named as static.
+ *
+ * TODO: DynamicServicePolicy can obtain new blacklist candidate through Flow monitoring process, which is responsible
+ * for monitoring the flow execution and react accordingly.
+ *
+ * Either user specify {@link ServiceNode} or {@link FlowEdge} to blacklist will all end up with a list of
+ * {@link FlowEdge}s that won't be considered when selecting path for data transformation.
+ */
+@Slf4j
+@Alias("static")
+public class StaticServicePolicy implements ServicePolicy {
+
+  @Getter
+  Set<FlowEdge> blacklistedEdges;
+
+  List<ServiceNode> serviceNodes;
+  List<FlowEdge> flowEdges;
+
+  public StaticServicePolicy() {
+    serviceNodes = new ArrayList<>();
+    flowEdges = new ArrayList<>();
+    blacklistedEdges = new HashSet<>();
+  }
+
+  public StaticServicePolicy(List<ServiceNode> serviceNodes, List<FlowEdge> flowEdges) {
+    Preconditions.checkNotNull(serviceNodes);
+    Preconditions.checkNotNull(flowEdges);
+    blacklistedEdges = new HashSet<>();
+    this.serviceNodes = serviceNodes;
+    this.flowEdges = flowEdges;
+  }
+
+  public void addServiceNode(ServiceNode serviceNode) {
+    this.serviceNodes.add(serviceNode);
+  }
+
+  public void addFlowEdge(FlowEdge flowEdge){
+    this.flowEdges.add(flowEdge);
+  }
+
+  @Override
+  public void populateBlackListedEdges(DirectedWeightedMultigraph<ServiceNode, FlowEdge> graph) {
+    for (ServiceNode node: serviceNodes) {
+      if (graph.containsVertex(node)) {
+        blacklistedEdges.addAll(graph.incomingEdgesOf(node));
+        blacklistedEdges.addAll(graph.outgoingEdgesOf(node));
+      } else {
+        log.info("The graph " + graph + " doesn't contains node " + node.toString());
+      }
+    }
+
+    for( FlowEdge flowEdge: flowEdges) {
+      if (graph.containsEdge(flowEdge)) {
+        blacklistedEdges.add(flowEdge);
+      } else {
+        log.info("The graph " + graph + "doesn't contains edge " + flowEdge.toString());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d223f90..a625f36 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -53,7 +53,7 @@ import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.scheduler.BaseGobblinJob;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
-import org.apache.gobblin.service.HelixUtils;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
index 1a9b8f6..fb7c1b0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
@@ -32,11 +32,12 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+
 
 
 @Alpha
@@ -45,7 +46,7 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory {
   private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
   private final Config _config;
   private final Logger _log;
-  private final ClassAliasResolver<SpecExecutorInstanceProducer> _aliasResolver;
+  private final ClassAliasResolver<SpecExecutor> _aliasResolver;
 
   public ConfigBasedTopologySpecFactory(Config config) {
     this(config, Optional.<Logger>absent());
@@ -55,7 +56,7 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory {
     Preconditions.checkNotNull(config, "Config should not be null");
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     _config = config;
-    _aliasResolver = new ClassAliasResolver<>(SpecExecutorInstanceProducer.class);
+    _aliasResolver = new ClassAliasResolver<>(SpecExecutor.class);
   }
 
   @Override
@@ -70,21 +71,21 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory {
 
     for (String topologyName : topologyNames) {
       Preconditions.checkArgument(_config.hasPath(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + topologyName),
-          "Config does not contain Topology Factory descriptor for Topology" + topologyName);
+          "Config does not contain Topology Factory descriptor for Topology " + topologyName);
       Config topologyConfig = _config.getConfig(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + topologyName);
       String description = ConfigUtils.getString(topologyConfig, ServiceConfigKeys.TOPOLOGYSPEC_DESCRIPTION_KEY, "NA");
       String version = ConfigUtils.getString(topologyConfig, ServiceConfigKeys.TOPOLOGYSPEC_VERSION_KEY, "-1");
 
-      String specExecutorInstanceProducerClass = ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER;
-      if (topologyConfig.hasPath(ServiceConfigKeys.SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY)) {
-        specExecutorInstanceProducerClass = topologyConfig.getString(ServiceConfigKeys.SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY);
+      String specExecutorClass = ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR;
+      if (topologyConfig.hasPath(ServiceConfigKeys.SPEC_EXECUTOR_KEY)) {
+        specExecutorClass = topologyConfig.getString(ServiceConfigKeys.SPEC_EXECUTOR_KEY);
       }
-      SpecExecutorInstanceProducer specExecutorInstanceProducer;
+      SpecExecutor specExecutor;
       try {
-        _log.info("Using SpecExecutorInstanceProducer class name/alias " + specExecutorInstanceProducerClass);
-        specExecutorInstanceProducer = (SpecExecutorInstanceProducer) ConstructorUtils
+        _log.info("Using SpecProducer class name/alias " + specExecutorClass);
+        specExecutor = (SpecExecutor) ConstructorUtils
             .invokeConstructor(Class.forName(_aliasResolver
-                .resolve(specExecutorInstanceProducerClass)), topologyConfig);
+                .resolve(specExecutorClass)), topologyConfig);
       } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
           | ClassNotFoundException e) {
         throw new RuntimeException(e);
@@ -95,10 +96,10 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory {
           .withConfig(topologyConfig)
           .withDescription(description)
           .withVersion(version)
-          .withSpecExecutorInstanceProducer(specExecutorInstanceProducer);
+          .withSpecExecutor(specExecutor);
       topologySpecs.add(topologySpecBuilder.build());
     }
 
     return topologySpecs;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
new file mode 100644
index 0000000..946fe10
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+import org.apache.gobblin.runtime.api.ServiceNode;
+
+/**
+ * This is a helping class(Basically a wrapper) for Shortest path finding process
+ * to keep the shortest distance from source to an arbitrary node.
+ */
+public class DistancedNode<T extends ServiceNode> {
+
+  /**
+   * The distance between {@link this} node to the src node in the shortest-distance finding problem.
+   */
+  private double distToSrc;
+
+  private T _serviceNode;
+
+
+  /**
+   * Max_Value represents no-connection.
+   */
+  public DistancedNode(T _serviceNode){
+    this(_serviceNode, Double.MAX_VALUE);
+  }
+
+  public DistancedNode(T _serviceNode, double dist){
+    this._serviceNode = _serviceNode;
+    this.distToSrc = dist;
+  }
+
+  public double getDistToSrc(){
+    return this.distToSrc;
+  }
+
+  public void setDistToSrc(double distToSrc){
+    this.distToSrc = distToSrc;
+  }
+
+  public T getNode(){
+    return this._serviceNode;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    DistancedNode<?> that = (DistancedNode<?>) o;
+
+    return _serviceNode.equals(that._serviceNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return _serviceNode.hashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
new file mode 100644
index 0000000..1481f78
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.gobblin.runtime.api.FlowEdge;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.service.modules.flow.LoadBasedFlowEdgeImpl;
+import org.jgrapht.graph.DirectedWeightedMultigraph;
+
+import lombok.extern.slf4j.Slf4j;
+
+import avro.shaded.com.google.common.annotations.VisibleForTesting;
+
+@Slf4j
+public class FindPathUtils {
+  // Since Author{autumnust@gmail.com} couldn't find the proper way to conduct Library provided by JGraphT
+  // on the customized-edge Graph, here is the raw implementation of Dijkstra algorithm for finding shortest path.
+
+  /**
+   * Given sourceNode and targetNode, find the shortest path and return shortest path.
+   * @return Each edge on this shortest path, in order.
+   *
+   */
+  @VisibleForTesting
+  public static List<FlowEdge> dijkstraBasedPathFindingHelper(ServiceNode sourceNode, ServiceNode targetNode,
+      DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph) {
+    Map<DistancedNode, ArrayList<FlowEdge>> shortestPath = new HashMap<>();
+    Map<DistancedNode, Double> shortestDist = new HashMap<>();
+    PriorityQueue<DistancedNode> pq = new PriorityQueue<>(new Comparator<DistancedNode>() {
+      @Override
+      public int compare(DistancedNode o1, DistancedNode o2) {
+        if (o1.getDistToSrc() < o2.getDistToSrc()) {
+          return -1;
+        } else {
+          return 1;
+        }
+      }
+    });
+    pq.add(new DistancedNode(sourceNode, 0.0));
+
+    Set<FlowEdge> visitedEdge = new HashSet<>();
+
+    while(!pq.isEmpty()) {
+      DistancedNode node = pq.poll();
+      if (node.getNode().getNodeName().equals(targetNode.getNodeName())) {
+        // Searching finished
+        return shortestPath.get(node);
+      }
+
+      Set<FlowEdge> outgoingEdges = weightedGraph.outgoingEdgesOf(node.getNode());
+      for (FlowEdge outGoingEdge:outgoingEdges) {
+        // Since it is a multi-graph problem, should use edge for deduplicaiton instead of vertex.
+        if (visitedEdge.contains(outGoingEdge)) {
+          continue;
+        }
+
+        DistancedNode adjacentNode = new DistancedNode(weightedGraph.getEdgeTarget(outGoingEdge));
+        if (shortestDist.containsKey(adjacentNode)) {
+          adjacentNode.setDistToSrc(shortestDist.get(adjacentNode));
+        }
+
+        double newDist = node.getDistToSrc() + ((LoadBasedFlowEdgeImpl) outGoingEdge).getEdgeLoad();
+
+        if (newDist < adjacentNode.getDistToSrc()) {
+          if (pq.contains(adjacentNode)) {
+            pq.remove(adjacentNode);
+          }
+
+          // Update the shortest path.
+          ArrayList<FlowEdge> path = shortestPath.containsKey(node)
+              ? new ArrayList<>(shortestPath.get(node)) : new ArrayList<>();
+          path.add(outGoingEdge);
+          shortestPath.put(adjacentNode, path);
+          shortestDist.put(adjacentNode, newDist);
+
+          adjacentNode.setDistToSrc(newDist);
+          pq.add(adjacentNode);
+        }
+        visitedEdge.add(outGoingEdge);
+      }
+    }
+    log.error("No path found");
+    return new ArrayList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
new file mode 100644
index 0000000..f2c1c84
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.UUID;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.slf4j.Logger;
+
+
+@Alpha
+public class HelixUtils {
+
+  /***
+   * Build a Helix Manager (Helix Controller instance).
+   *
+   * @param helixInstanceName the Helix Instance name.
+   * @param helixClusterName the Helix Cluster name.
+   * @param zkConnectionString the ZooKeeper connection string.
+   * @return HelixManager
+   */
+  public static HelixManager buildHelixManager(String helixInstanceName, String helixClusterName, String zkConnectionString) {
+    return HelixManagerFactory.getZKHelixManager(helixClusterName, helixInstanceName,
+        InstanceType.CONTROLLER, zkConnectionString);
+  }
+
+  /**
+   * Create a Helix cluster for the Gobblin Cluster application.
+   *
+   * @param zkConnectionString the ZooKeeper connection string
+   * @param clusterName the Helix cluster name
+   */
+  public static void createGobblinHelixCluster(String zkConnectionString, String clusterName) {
+    createGobblinHelixCluster(zkConnectionString, clusterName, true);
+  }
+
+  /**
+   * Create a Helix cluster for the Gobblin Cluster application.
+   *
+   * @param zkConnectionString the ZooKeeper connection string
+   * @param clusterName the Helix cluster name
+   * @param overwrite true to overwrite exiting cluster, false to reuse existing cluster
+   */
+  public static void createGobblinHelixCluster(String zkConnectionString, String clusterName, boolean overwrite) {
+    ClusterSetup clusterSetup = new ClusterSetup(zkConnectionString);
+    // Create the cluster and overwrite if it already exists
+    clusterSetup.addCluster(clusterName, overwrite);
+    // Helix 0.6.x requires a configuration property to have the form key=value.
+    String autoJoinConfig = ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN + "=true";
+    clusterSetup.setConfig(HelixConfigScope.ConfigScopeProperty.CLUSTER, clusterName, autoJoinConfig);
+  }
+
+  /**
+   * Get a Helix instance name.
+   *
+   * @param namePrefix a prefix of Helix instance names
+   * @param instanceId an integer instance ID
+   * @return a Helix instance name that is a concatenation of the given prefix and instance ID
+   */
+  public static String getHelixInstanceName(String namePrefix, int instanceId) {
+    return namePrefix + "_" + instanceId;
+  }
+
+  @VisibleForTesting
+  public static void sendUserDefinedMessage(String messageSubType, String messageVal, String messageId,
+      InstanceType instanceType, HelixManager helixManager, Logger logger) {
+    Criteria criteria = new Criteria();
+    criteria.setInstanceName("%");
+    criteria.setResource("%");
+    criteria.setPartition("%");
+    criteria.setPartitionState("%");
+    criteria.setRecipientInstanceType(instanceType);
+    criteria.setSessionSpecific(true);
+
+    Message message = new Message(Message.MessageType.USER_DEFINE_MSG.toString(), messageId);
+    message.setMsgSubType(messageSubType);
+    message.setAttribute(Message.Attributes.INNER_MESSAGE, messageVal);
+    message.setMsgState(Message.MessageState.NEW);
+    message.setTgtSessionId("*");
+
+    int messagesSent = helixManager.getMessagingService().send(criteria, message);
+    if (messagesSent == 0) {
+      logger.error(String.format("Failed to send the %s message to the participants", message));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 01d0285..289e212 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -49,8 +49,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
-import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.HelixUtils;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;
@@ -139,8 +138,8 @@ public class GobblinServiceHATest {
         "1");
     commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".uri",
         "gobblinExecutor");
-    commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstanceProducer",
-        "org.apache.gobblin.service.InMemorySpecExecutorInstanceProducer");
+    commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance",
+        "org.gobblin.service.InMemorySpecExecutor");
     commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities",
         TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
 
@@ -506,4 +505,4 @@ public class GobblinServiceHATest {
 
     Assert.assertTrue(assertSuccess, "New master should take over all old master jobs.");
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 314dc66..b40792e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -103,8 +103,8 @@ public class GobblinServiceManagerTest {
         "1");
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".uri",
         "gobblinExecutor");
-    serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstanceProducer",
-        "org.apache.gobblin.service.InMemorySpecExecutorInstanceProducer");
+    serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance",
+        "org.apache.gobblin.service.InMemorySpecExecutor");
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities",
         TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
 
@@ -339,4 +339,4 @@ public class GobblinServiceManagerTest {
 
     Assert.fail("Get should have raised a 404 error");
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
index 0b3dc15..864b238 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.service.modules.core;
 
+
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -39,14 +40,14 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
-
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 
 public class IdentityFlowToJobSpecCompilerTest {
   private static final Logger logger = LoggerFactory.getLogger(IdentityFlowToJobSpecCompilerTest.class);
@@ -114,14 +115,14 @@ public class IdentityFlowToJobSpecCompilerTest {
     properties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR);
     properties.put("specExecInstance.capabilities", TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
     Config config = ConfigUtils.propertiesToConfig(properties);
-    SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config);
+    SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config);
 
     TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
         TOPOLOGY_SPEC_STORE_DIR))
         .withConfig(config)
         .withDescription(SPEC_DESCRIPTION)
         .withVersion(SPEC_VERSION)
-        .withSpecExecutorInstanceProducer(specExecutorInstanceProducer);
+        .withSpecExecutor(specExecutorInstance);
     return topologySpecBuilder.build();
   }
 
@@ -152,7 +153,7 @@ public class IdentityFlowToJobSpecCompilerTest {
     return flowSpecBuilder.build();
   }
 
-  public URI computeTopologySpecURI(String parent, String current) {
+  public static URI computeTopologySpecURI(String parent, String current) {
     // Make sure this is relative
     return PathUtils.relativizePath(new Path(current), new Path(parent)).toUri();
   }
@@ -186,7 +187,7 @@ public class IdentityFlowToJobSpecCompilerTest {
     FlowSpec flowSpec = initFlowSpec();
 
     // Run compiler on flowSpec
-    Map<Spec, SpecExecutorInstanceProducer> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec);
+    Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec);
 
     // Assert pre-requisites
     Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
@@ -215,7 +216,7 @@ public class IdentityFlowToJobSpecCompilerTest {
     FlowSpec flowSpec = initFlowSpec();
 
     // Run compiler on flowSpec
-    Map<Spec, SpecExecutorInstanceProducer> specExecutorMapping = this.compilerWithoutTemplateCalague.compileFlow(flowSpec);
+    Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithoutTemplateCalague.compileFlow(flowSpec);
 
     // Assert pre-requisites
     Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
@@ -244,10 +245,10 @@ public class IdentityFlowToJobSpecCompilerTest {
     FlowSpec flowSpec = initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, "unsupportedSource", "unsupportedSink");
 
     // Run compiler on flowSpec
-    Map<Spec, SpecExecutorInstanceProducer> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec);
+    Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec);
 
     // Assert pre-requisites
     Assert.assertNotNull(specExecutorMapping, "Expected non null mapping.");
     Assert.assertTrue(specExecutorMapping.size() == 0, "Exepected 1 executor for FlowSpec.");
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
new file mode 100644
index 0000000..cc722eb
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import com.typesafe.config.Config;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.jgrapht.graph.DirectedWeightedMultigraph;
+import org.jgrapht.graph.WeightedMultigraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
+import org.apache.gobblin.runtime.api.FlowEdge;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.LoadBasedFlowEdgeImpl;
+import org.apache.gobblin.service.modules.flow.MultiHopsFlowToJobSpecCompiler;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
+import static org.apache.gobblin.service.modules.utils.FindPathUtils.*;
+
+
+// All unit tests here will be with templateCatelogue.
+public class MultiHopsFlowToJobSpecCompilerTest {
+  private static final Logger logger = LoggerFactory.getLogger(MultiHopsFlowToJobSpecCompilerTest.class);
+
+  private static final String TEST_TEMPLATE_CATALOG_PATH = "/tmp/gobblinTestTemplateCatalog_" + System.currentTimeMillis();
+  private static final String TEST_TEMPLATE_CATALOG_URI = "file://" + TEST_TEMPLATE_CATALOG_PATH;
+
+  private static final String TEST_TEMPLATE_NAME = "test.template";
+  private static final String TEST_TEMPLATE_URI = "FS:///test.template";
+
+  // The path to be discovered is TEST_SOURCE_NAME -> TEST_HOP_NAME_A -> TEST_HOP_NAME_B -> TEST_SINK_NAME
+  private static final String TEST_SOURCE_NAME = "testSource";
+  private static final String TEST_HOP_NAME_A = "testHopA";
+  private static final String TEST_HOP_NAME_B = "testHopB";
+  private static final String TEST_HOP_NAME_C = "testHopC";
+  private static final String TEST_SINK_NAME = "testSink";
+  private static final String TEST_FLOW_GROUP = "testFlowGroup";
+  private static final String TEST_FLOW_NAME = "testFlowName";
+
+  private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
+  private static final String SPEC_DESCRIPTION = "Test Orchestrator";
+  private static final String SPEC_VERSION = "1";
+  private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis();
+  private static final String TOPOLOGY_SPEC_STORE_DIR_SECOND = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis() + "_2";
+  private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis();
+
+  private ServiceNode vertexSource;
+  private ServiceNode vertexHopA;
+  private ServiceNode vertexHopB;
+  private ServiceNode vertexHopC;
+  private ServiceNode vertexSink;
+
+  private MultiHopsFlowToJobSpecCompiler compilerWithTemplateCalague;
+  private Map<String, List<URI>> edgeTemplateMap;
+
+
+  @BeforeClass
+  public void setUp() throws Exception{
+    // Create dir for template catalog
+    FileUtils.forceMkdir(new File(TEST_TEMPLATE_CATALOG_PATH));
+
+    // Create template to use in test
+    List<String> templateEntries = new ArrayList<>();
+    templateEntries.add("testProperty1 = \"testValue1\"");
+    templateEntries.add("testProperty2 = \"test.Value1\"");
+    templateEntries.add("testProperty3 = 100");
+    FileUtils.writeLines(new File(TEST_TEMPLATE_CATALOG_PATH + "/" + TEST_TEMPLATE_NAME), templateEntries);
+
+    // Initialize complier with template catalog
+    Properties compilerWithTemplateCatalogProperties = new Properties();
+    compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI);
+
+    // Initialize compiler with common useful properties
+    String testPath = TEST_SOURCE_NAME + "," + TEST_HOP_NAME_A + "," + TEST_HOP_NAME_B + "," + TEST_SINK_NAME;
+    compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH, testPath);
+
+    this.compilerWithTemplateCalague = new MultiHopsFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties));
+
+    vertexSource = new BaseServiceNodeImpl(TEST_SOURCE_NAME);
+    vertexHopA = new BaseServiceNodeImpl(TEST_HOP_NAME_A);
+    vertexHopB = new BaseServiceNodeImpl(TEST_HOP_NAME_B);
+    vertexHopC = new BaseServiceNodeImpl(TEST_HOP_NAME_C);
+    vertexSink = new BaseServiceNodeImpl(TEST_SINK_NAME);
+
+  }
+
+  @AfterClass
+  public void cleanUp() throws Exception {
+    // Cleanup Template Catalog
+    try {
+      cleanUpDir(TEST_TEMPLATE_CATALOG_PATH);
+    } catch (Exception e) {
+      logger.warn("Could not completely cleanup Template catalog dir");
+    }
+
+    // Cleanup ToplogySpec Dir
+    try {
+      cleanUpDir(TOPOLOGY_SPEC_STORE_DIR);
+    } catch (Exception e) {
+      logger.warn("Could not completely cleanup ToplogySpec catalog dir");
+    }
+
+    // Cleanup FlowSpec Dir
+    try {
+      cleanUpDir(FLOW_SPEC_STORE_DIR);
+    } catch (Exception e) {
+      logger.warn("Could not completely cleanup FlowSpec catalog dir");
+    }
+  }
+
+  @Test
+  public void testWeightedGraphConstruction(){
+    FlowSpec flowSpec = initFlowSpec();
+    TopologySpec topologySpec = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
+    this.compilerWithTemplateCalague.onAddSpec(topologySpec);
+
+    // invocation of compileFlow trigger the weighedGraph construction
+    this.compilerWithTemplateCalague.compileFlow(flowSpec);
+    DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph = compilerWithTemplateCalague.getWeightedGraph();
+
+    Assert.assertTrue(weightedGraph.containsVertex(vertexSource));
+    Assert.assertTrue(weightedGraph.containsVertex(vertexHopA));
+    Assert.assertTrue(weightedGraph.containsVertex(vertexHopB));
+    Assert.assertTrue(weightedGraph.containsVertex(vertexSink));
+
+    FlowEdge edgeSrc2A = new LoadBasedFlowEdgeImpl(vertexSource, vertexHopA, topologySpec.getSpecExecutor());
+    FlowEdge edgeA2B = new LoadBasedFlowEdgeImpl(vertexHopA, vertexHopB, topologySpec.getSpecExecutor());
+    FlowEdge edgeB2Sink = new LoadBasedFlowEdgeImpl(vertexHopB, vertexSink, topologySpec.getSpecExecutor());
+
+    Assert.assertTrue(weightedGraph.containsEdge(edgeSrc2A));
+    Assert.assertTrue(weightedGraph.containsEdge(edgeA2B));
+    Assert.assertTrue(weightedGraph.containsEdge(edgeB2Sink));
+
+    Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexSource, vertexHopA), edgeSrc2A));
+    Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexHopA, vertexHopB), edgeA2B));
+    Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexHopB, vertexSink), edgeB2Sink));
+
+    this.compilerWithTemplateCalague.onDeleteSpec(topologySpec.getUri(), "");
+  }
+
+  @Test
+  public void testUserSpecifiedPathCompilation(){
+    // TODO
+  }
+
+  @Test
+  public void testServicePolicy(){
+    // Initialize compiler with some blacklist properties
+    Properties properties = new Properties();
+    properties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI);
+    String testPath = TEST_SOURCE_NAME + "," + TEST_HOP_NAME_A + "," + TEST_HOP_NAME_B + "," + TEST_SINK_NAME;
+    properties.setProperty(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH, testPath);
+    properties.setProperty(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES,
+        "testHopA");
+    MultiHopsFlowToJobSpecCompiler compiler = new MultiHopsFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(properties));
+
+
+    FlowSpec flowSpec = initFlowSpec();
+    TopologySpec topologySpec = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
+    compiler.onAddSpec(topologySpec);
+
+    // invocation of compileFlow trigger the weighedGraph construction
+    compiler.compileFlow(flowSpec);
+
+    compiler.servicePolicy.populateBlackListedEdges(compiler.getWeightedGraph());
+    Assert.assertEquals(compiler.servicePolicy.getBlacklistedEdges().size(), 2);
+
+    FlowEdge edgeSrc2A = new LoadBasedFlowEdgeImpl(vertexSource, vertexHopA, topologySpec.getSpecExecutor());
+    FlowEdge edgeA2B = new LoadBasedFlowEdgeImpl(vertexHopA, vertexHopB, topologySpec.getSpecExecutor());
+
+    Assert.assertTrue(compiler.servicePolicy.getBlacklistedEdges().contains(edgeSrc2A));
+    Assert.assertTrue(compiler.servicePolicy.getBlacklistedEdges().contains(edgeA2B));
+
+  }
+
+  @Test (dependsOnMethods = "testWeightedGraphConstruction")
+  public void testDijkstraPathFinding(){
+
+    FlowSpec flowSpec = initFlowSpec();
+    TopologySpec topologySpec_1 = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
+    TopologySpec topologySpec_2 = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR_SECOND, TEST_SOURCE_NAME, TEST_HOP_NAME_B, TEST_HOP_NAME_C, TEST_SINK_NAME);
+    this.compilerWithTemplateCalague.onAddSpec(topologySpec_1);
+    this.compilerWithTemplateCalague.onAddSpec(topologySpec_2);
+
+    // Get the edge -> Change the weight -> Materialized the edge change back to graph -> compile again -> Assertion
+    this.compilerWithTemplateCalague.compileFlow(flowSpec);
+    DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph = compilerWithTemplateCalague.getWeightedGraph();
+    FlowEdge a2b= weightedGraph.getEdge(vertexHopA, vertexHopB);
+    FlowEdge b2c = weightedGraph.getEdge(vertexHopB, vertexHopC);
+    FlowEdge c2s = weightedGraph.getEdge(vertexHopC, vertexSink);
+    weightedGraph.setEdgeWeight(a2b, 1.99);
+    weightedGraph.setEdgeWeight(b2c, 0.1);
+    weightedGraph.setEdgeWeight(c2s, 0.2);
+
+    // Best route: Src - B(1) - C(0.1) - sink (0.2)
+    this.compilerWithTemplateCalague.compileFlow(flowSpec);
+    List<FlowEdge> edgeList = dijkstraBasedPathFindingHelper(vertexSource, vertexSink, weightedGraph);
+
+    FlowEdge src2b = weightedGraph.getEdge(vertexSource, vertexHopB);
+    FlowEdge b2C = weightedGraph.getEdge(vertexHopB, vertexHopC);
+    FlowEdge c2sink = weightedGraph.getEdge(vertexHopC, vertexSink);
+    Assert.assertEquals(edgeList.get(0).getEdgeIdentity(), src2b.getEdgeIdentity());
+    Assert.assertEquals(edgeList.get(1).getEdgeIdentity(), b2C.getEdgeIdentity());
+    Assert.assertEquals(edgeList.get(2).getEdgeIdentity(), c2sink.getEdgeIdentity());
+
+    this.compilerWithTemplateCalague.onDeleteSpec(topologySpec_1.getUri(), "");
+    this.compilerWithTemplateCalague.onDeleteSpec(topologySpec_2.getUri(), "");
+  }
+
+  // The topology is: Src - A - B - Dest
+  private TopologySpec initTopologySpec(String storeDir, String ... args) {
+    Properties properties = new Properties();
+    properties.put("specStore.fs.dir", storeDir);
+    String capabilitiesString = "";
+    for(int i =0 ; i < args.length - 1 ; i ++ ) {
+      capabilitiesString = capabilitiesString + ( args[i] + ":" + args[i+1] + ",");
+    }
+    Assert.assertEquals(capabilitiesString.charAt(capabilitiesString.length() - 1) , ',');
+    capabilitiesString = capabilitiesString.substring(0, capabilitiesString.length() - 1 );
+    properties.put("specExecInstance.capabilities", capabilitiesString);
+    properties.put("executorAttrs", new Properties());
+    Config config = ConfigUtils.propertiesToConfig(properties);
+    SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config);
+
+    TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(
+        IdentityFlowToJobSpecCompilerTest.computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
+            storeDir))
+        .withConfig(config)
+        .withDescription(SPEC_DESCRIPTION)
+        .withVersion(SPEC_VERSION)
+        .withSpecExecutor(specExecutorInstance);
+    return topologySpecBuilder.build();
+  }
+
+  private FlowSpec initFlowSpec() {
+    return initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, TEST_SOURCE_NAME, TEST_SINK_NAME);
+  }
+
+  private FlowSpec initFlowSpec(String flowGroup, String flowName, String source, String destination) {
+    Properties properties = new Properties();
+    properties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
+    properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
+    properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+    properties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source);
+    properties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, destination);
+    Config config = ConfigUtils.propertiesToConfig(properties);
+
+    FlowSpec.Builder flowSpecBuilder = null;
+    try {
+      flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
+          FLOW_SPEC_STORE_DIR))
+          .withConfig(config)
+          .withDescription("dummy description")
+          .withVersion("1")
+          .withTemplate(new URI(TEST_TEMPLATE_URI));
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+    return flowSpecBuilder.build();
+  }
+
+  private void cleanUpDir(String dir) throws Exception {
+    File specStoreDir = new File(dir);
+    if (specStoreDir.exists()) {
+      FileUtils.deleteDirectory(specStoreDir);
+    }
+  }
+
+  // Criteria for FlowEdge to be equal in testing context
+  private boolean edgeEqual(FlowEdge a, FlowEdge b){
+    return (a.getEdgeIdentity().equals(b.getEdgeIdentity()) &&
+        ((LoadBasedFlowEdgeImpl)a).getEdgeLoad() == ((LoadBasedFlowEdgeImpl)b).getEdgeLoad());
+  }
+
+  // Use this function for
+  private void populateTemplateMap(WeightedMultigraph<ServiceNode, FlowEdge> weightedGraph, URI exempliedURI){
+    this.edgeTemplateMap.clear();
+    Set<FlowEdge> allEdges = weightedGraph.edgeSet();
+    for ( FlowEdge edge : allEdges ) {
+      this.edgeTemplateMap.put(edge.getEdgeIdentity(), Arrays.asList(exempliedURI)) ;
+    }
+  }
+
+  public static URI computeTopologySpecURI(String parent, String current) {
+    // Make sure this is relative
+    return PathUtils.relativizePath(new Path(current), new Path(parent)).toUri();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index eb9974a..a933e85 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -40,13 +42,10 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecCompiler;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
@@ -120,14 +119,14 @@ public class OrchestratorTest {
     properties.put("specExecInstance.capabilities", "source:destination");
     Config config = ConfigUtils.propertiesToConfig(properties);
 
-    SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config);
+    SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config);
 
     TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
         TOPOLOGY_SPEC_STORE_DIR))
         .withConfig(config)
         .withDescription(SPEC_DESCRIPTION)
         .withVersion(SPEC_VERSION)
-        .withSpecExecutorInstanceProducer(specExecutorInstanceProducer);
+        .withSpecExecutor(specExecutorInstance);
     return topologySpecBuilder.build();
   }
 
@@ -139,8 +138,6 @@ public class OrchestratorTest {
     properties.put("gobblin.flow.destinationIdentifier", "destination");
     Config config = ConfigUtils.propertiesToConfig(properties);
 
-    SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config);
-
     FlowSpec.Builder flowSpecBuilder = null;
     try {
       flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
@@ -209,10 +206,10 @@ public class OrchestratorTest {
 
   @Test (dependsOnMethods = "createTopologySpec")
   public void createFlowSpec() throws Exception {
-    // Since only 1 Topology with 1 SpecExecutorInstanceProducer has been added in previous test
+    // Since only 1 Topology with 1 SpecProducer has been added in previous test
     // .. it should be available and responsible for our new FlowSpec
     IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
-    SpecExecutorInstanceProducer sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutorInstanceProducer();
+    SpecExecutor sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
 
     // List Current Specs
     Collection<Spec> specs = flowCatalog.getSpecs();
@@ -225,7 +222,7 @@ public class OrchestratorTest {
     // Make sure FlowCatalog is empty
     Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
     // Make sure FlowCatalog Listener is empty
-    Assert.assertTrue(((List)(sei.listSpecs().get())).size() == 0, "SpecExecutorInstanceProducer should not know about "
+    Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 0, "SpecProducer should not know about "
         + "any Flow before addition");
 
     // Create and add Spec
@@ -243,7 +240,7 @@ public class OrchestratorTest {
     // Make sure FlowCatalog has the added Flow
     Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition");
     // Orchestrator is a no-op listener for any new FlowSpecs
-    Assert.assertTrue(((List)(sei.listSpecs().get())).size() == 0, "SpecExecutorInstanceProducer should contain 0 "
+    Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 0, "SpecProducer should contain 0 "
         + "Spec after addition");
   }
 
@@ -251,7 +248,7 @@ public class OrchestratorTest {
   public void deleteFlowSpec() throws Exception {
     // Since only 1 Flow has been added in previous test it should be available
     IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
-    SpecExecutorInstanceProducer sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutorInstanceProducer();
+    SpecExecutor sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
 
     // List Current Specs
     Collection<Spec> specs = flowCatalog.getSpecs();
@@ -264,8 +261,8 @@ public class OrchestratorTest {
     // Make sure FlowCatalog has the previously added Flow
     Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Flow that was added in last test");
     // Orchestrator is a no-op listener for any new FlowSpecs, so no FlowSpecs should be around
-    int specsInSEI = ((List)(sei.listSpecs().get())).size();
-    Assert.assertTrue(specsInSEI == 0, "SpecExecutorInstanceProducer should contain 0 "
+    int specsInSEI = ((List)(sei.getProducer().get().listSpecs().get())).size();
+    Assert.assertTrue(specsInSEI == 0, "SpecProducer should contain 0 "
         + "Spec after addition because Orchestrator is a no-op listener for any new FlowSpecs");
 
     // Remove the flow
@@ -283,8 +280,8 @@ public class OrchestratorTest {
     // Make sure FlowCatalog has the Flow removed
     Assert.assertTrue(specs.size() == 0, "Spec store should not contain Spec after deletion");
     // Make sure FlowCatalog Listener knows about the deletion
-    specsInSEI = ((List)(sei.listSpecs().get())).size();
-    Assert.assertTrue(specsInSEI == 0, "SpecExecutorInstanceProducer should not contain "
+    specsInSEI = ((List)(sei.getProducer().get().listSpecs().get())).size();
+    Assert.assertTrue(specsInSEI == 0, "SpecProducer should not contain "
         + "Spec after deletion");
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java
index c28e97b..36193dc 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java
@@ -54,8 +54,8 @@ public class ConfigBasedTopologySpecFactoryTest {
     properties.put(topology1Prefix + ServiceConfigKeys.TOPOLOGYSPEC_DESCRIPTION_KEY, "Topology for cluster");
     properties.put(topology1Prefix + ServiceConfigKeys.TOPOLOGYSPEC_VERSION_KEY, "1");
     properties.put(topology1Prefix + ServiceConfigKeys.TOPOLOGYSPEC_URI_KEY, "/mySpecs/" + topology1);
-    properties.put(topology1Prefix + ServiceConfigKeys.SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY,
-        ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER);
+    properties.put(topology1Prefix + ServiceConfigKeys.SPEC_EXECUTOR_KEY,
+        ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR);
     properties.put(topology1Prefix + ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY, "salesforce:nosql");
 
     // Topology Azkaban1 properties
@@ -63,8 +63,8 @@ public class ConfigBasedTopologySpecFactoryTest {
     properties.put(topology2Prefix + ServiceConfigKeys.TOPOLOGYSPEC_DESCRIPTION_KEY, "Topology for Azkaban");
     properties.put(topology2Prefix + ServiceConfigKeys.TOPOLOGYSPEC_VERSION_KEY, "2");
     properties.put(topology2Prefix + ServiceConfigKeys.TOPOLOGYSPEC_URI_KEY, "/mySpecs/" + topology2);
-    properties.put(topology2Prefix + ServiceConfigKeys.SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY,
-        ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER);
+    properties.put(topology2Prefix + ServiceConfigKeys.SPEC_EXECUTOR_KEY,
+        ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR);
     properties.put(topology2Prefix + ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY, "nosql:hdfs");
 
     _config = ConfigUtils.propertiesToConfig(properties);
@@ -94,4 +94,4 @@ public class ConfigBasedTopologySpecFactoryTest {
         "Version did not match with construction");
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 24b2473..cde546c 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -91,6 +91,7 @@ ext.externalDependency = [
     "jacksonMapper": "org.codehaus.jackson:jackson-mapper-asl:1.9.13",
     "jasypt": "org.jasypt:jasypt:1.9.2",
     "jodaTime": "joda-time:joda-time:2.9.3",
+    "jgrapht": "org.jgrapht:jgrapht-core:0.9.2",
     "metricsCore": "io.dropwizard.metrics:metrics-core:" + dropwizardMetricsVersion,
     "metricsJvm": "io.dropwizard.metrics:metrics-jvm:" + dropwizardMetricsVersion,
     "metricsGraphite": "io.dropwizard.metrics:metrics-graphite:" + dropwizardMetricsVersion,


Mime
View raw message