gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [2/4] incubator-gobblin git commit: [GOBBLIN-3] Multi-hop flow compiler implementation
Date Tue, 12 Sep 2017 09:30:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
new file mode 100644
index 0000000..e0a235c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
+
+
+/**
+ * An abstract implementation of SpecExecutor without specifying communication mechanism.
+ *
+ * Normally in the implementation of {@link AbstractSpecExecutor}, it is necessary to specify:
+ * {@link SpecProducer}
+ * {@link SpecConsumer}
+ * {@link Closer}
+ */
+public abstract class AbstractSpecExecutor extends AbstractIdleService implements SpecExecutor {
+
+  private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+  private static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
+
+  protected final transient Logger log;
+
+  // Executor Instance identifier
+  protected final URI specExecutorInstanceUri;
+
+  @SuppressWarnings(justification = "No bug", value = "SE_BAD_FIELD")
+  protected final Config config;
+
+  protected final Map<ServiceNode, ServiceNode> capabilities;
+
+  /**
+   * While AbstractSpecExecutor is up, for most producer implementations (like SimpleKafkaSpecProducer),
+   * they implements {@link java.io.Closeable} which requires registration and close methods.
+   * {@link Closer} is mainly used for managing {@link SpecProducer} and {@link SpecConsumer}.
+   */
+  protected Optional<Closer> optionalCloser;
+
+  public AbstractSpecExecutor(Config config) {
+    this(config, Optional.<Logger>absent());
+  }
+
+  public AbstractSpecExecutor(Config config, GobblinInstanceEnvironment env) {
+    this(config, Optional.of(env.getLog()));
+  }
+
+  public AbstractSpecExecutor(Config config, Optional<Logger> log) {
+
+    /**
+     * Since URI is regarded as the unique identifier for {@link SpecExecutor}(Used in equals method)
+     * it is dangerous to use default URI.
+     */
+    if (!config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY)) {
+      if (log.isPresent()) {
+        log.get().warn("The SpecExecutor doesn't specify URI, using the default one.");
+      }
+    }
+
+    try {
+      specExecutorInstanceUri =
+          new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, "NA"));
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+    this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
+    this.config = config;
+    this.capabilities = Maps.newHashMap();
+    if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) {
+      String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY);
+      List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr);
+      for (String capability : capabilities) {
+        List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability);
+        Preconditions.checkArgument(currentCapability.size() == 2,
+            "Only one source:destination pair is supported " + "per capability, found: " + currentCapability);
+        this.capabilities.put(new BaseServiceNodeImpl(currentCapability.get(0)),
+            new BaseServiceNodeImpl(currentCapability.get(1)));
+      }
+    }
+    optionalCloser = Optional.absent();
+  }
+
+  @Override
+  public URI getUri() {
+    return specExecutorInstanceUri;
+  }
+
+  /**
+   * The definition of attributes are the technology that a {@link SpecExecutor} is using and
+   * the physical location that it runs on.
+   *
+   * These attributes are supposed to be static and read-only.
+   */
+  @Override
+  public Config getAttrs() {
+    Preconditions.checkArgument(this.config.hasPath(ServiceConfigKeys.ATTRS_PATH_IN_CONFIG),
+        "Input configuration doesn't contains SpecExecutor Attributes path.");
+    return this.config.getConfig(ServiceConfigKeys.ATTRS_PATH_IN_CONFIG);
+  }
+
+  @Override
+  public Future<Config> getConfig() {
+    return new CompletedFuture(this.config, null);
+  }
+
+  @Override
+  public Future<? extends Map<ServiceNode, ServiceNode>> getCapabilities() {
+    return new CompletedFuture(this.capabilities, null);
+  }
+
+  /**
+   * Two {@link SpecExecutor}s with the same {@link #specExecutorInstanceUri}
+   * should be considered as the same {@link SpecExecutor}.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    AbstractSpecExecutor that = (AbstractSpecExecutor) o;
+
+    return specExecutorInstanceUri.equals(that.specExecutorInstanceUri);
+  }
+
+  @Override
+  public int hashCode() {
+    return specExecutorInstanceUri.hashCode();
+  }
+
+  /**
+   * @return In default implementation we just return 'Healthy'.
+   */
+  @Override
+  public Future<String> getHealth() {
+    return new CompletedFuture("Healthy", null);
+  }
+
+  abstract protected void startUp() throws Exception;
+
+  abstract protected void shutDown() throws Exception;
+
+  abstract public Future<? extends SpecProducer> getProducer();
+
+  abstract public Future<String> getDescription();
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
new file mode 100644
index 0000000..dcc0c3b
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.Setter;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.Getter;
+
+/**
+ * A base implementation for {@link ServiceNode} with default secured setting.
+ */
+public class BaseServiceNodeImpl implements ServiceNode {
+
+  @Getter
+  public String nodeName;
+
+  /**
+   * Contains read-only properties of an {@link ServiceNode}
+   */
+  @Getter
+  public Config nodeProps;
+
+  /**
+   * One of mutable properties of Node.
+   * Initialization: Obtained from {@link ServiceConfigKeys}.
+   * Getter/Setter: Simply thur. {@link BaseServiceNodeImpl}.
+   */
+  @Getter
+  @Setter
+  private boolean isNodeSecure;
+
+  /**
+   * For nodes missing configuration
+   * @param nodeName
+   */
+  public BaseServiceNodeImpl(String nodeName) {
+    this(nodeName, new Properties());
+  }
+
+  public BaseServiceNodeImpl(String nodeName, Properties props) {
+    Preconditions.checkNotNull(nodeName);
+    this.nodeName = nodeName;
+    isNodeSecure = Boolean.parseBoolean
+        (props.getProperty(ServiceConfigKeys.NODE_SECURITY_KEY, ServiceConfigKeys.DEFAULT_NODE_SECURITY));
+    nodeProps = ConfigUtils.propertiesToConfig(props);
+  }
+
+  /**
+   * By default each node is acceptable to use in path-finding.
+   */
+  @Override
+  public boolean isNodeEnabled() {
+    return true;
+  }
+
+  /**
+   * The comparison between two nodes should involve the configuration.
+   * Node name is the identifier for the node.
+   * */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    BaseServiceNodeImpl that = (BaseServiceNodeImpl) o;
+
+    return nodeName.equals(that.nodeName);
+  }
+
+  @Override
+  public int hashCode() {
+    return nodeName.hashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
new file mode 100644
index 0000000..e0be4e9
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValue;
+import java.net.URI;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+
+
+
+/**
+ * An {@link SpecExecutor} implementation that keep provisioned {@link Spec} in memory.
+ * Therefore there's no necessity to install {@link SpecConsumer} in this case.
+ */
+public class InMemorySpecExecutor extends AbstractSpecExecutor {
+  // Communication mechanism components.
+  // Not specifying final for further extension based on this implementation.
+  private SpecProducer<Spec> inMemorySpecProducer;
+
+  public InMemorySpecExecutor(Config config){
+    this(config, Optional.absent());
+  }
+
+  public InMemorySpecExecutor(Config config, GobblinInstanceEnvironment env){
+    this(config, Optional.of(env.getLog()));
+  }
+
+  public InMemorySpecExecutor(Config config, Optional<Logger> log) {
+    super(config, log);
+    inMemorySpecProducer = new InMemorySpecProducer(config);
+  }
+
+  /**
+   * A creator that create a SpecExecutor only specifying URI for uniqueness.
+   * @param uri
+   */
+  public static SpecExecutor createDummySpecExecutor(URI uri) {
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, uri.toString());
+    return new InMemorySpecExecutor(ConfigFactory.parseProperties(properties));
+  }
+
+  @Override
+  public Future<String> getDescription() {
+    return new CompletedFuture("InMemory SpecExecutor", null);
+  }
+
+  @Override
+  public Future<? extends SpecProducer> getProducer(){
+    return new CompletedFuture(this.inMemorySpecProducer, null);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    // Nothing to do in the abstract implementation.
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // Nothing to do in the abstract implementation.
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutorInstanceProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutorInstanceProducer.java
deleted file mode 100644
index 77faaa7..0000000
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutorInstanceProducer.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.spec_executorInstance;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import edu.umd.cs.findbugs.annotations.SuppressWarnings;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
-import org.apache.gobblin.util.CompletedFuture;
-
-
-public class InMemorySpecExecutorInstanceProducer implements SpecExecutorInstanceProducer<Spec>, Serializable {
-
-  private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
-  private static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
-
-  private static final long serialVersionUID = 6106269076155338045L;
-
-  protected final transient Logger log;
-  protected final Map<URI, Spec> provisionedSpecs;
-  @SuppressWarnings (justification="No bug", value="SE_BAD_FIELD")
-  protected final Config config;
-  protected final Map<String, String> capabilities;
-
-  public InMemorySpecExecutorInstanceProducer(Config config) {
-    this(config, Optional.<Logger>absent());
-  }
-
-  public InMemorySpecExecutorInstanceProducer(Config config, GobblinInstanceEnvironment env) {
-    this(config, Optional.of(env.getLog()));
-  }
-
-  public InMemorySpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
-    this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
-    this.config = config;
-    this.provisionedSpecs = Maps.newHashMap();
-    this.capabilities = Maps.newHashMap();
-    if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) {
-      String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY);
-      List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr);
-      for (String capability : capabilities) {
-        List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability);
-        Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported "
-            + "per capability, found: " + currentCapability);
-        this.capabilities.put(currentCapability.get(0), currentCapability.get(1));
-      }
-    }
-  }
-
-  @Override
-  public URI getUri() {
-    try {
-      return new URI("InMemorySpecExecutorInstanceProducer");
-    } catch (URISyntaxException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Future<String> getDescription() {
-    return new CompletedFuture("InMemory SpecExecutorInstanceProducer", null);
-  }
-
-  @Override
-  public Future<Config> getConfig() {
-    return new CompletedFuture(this.config, null);
-  }
-
-  @Override
-  public Future<String> getHealth() {
-    return new CompletedFuture("Healthy", null);
-  }
-
-  @Override
-  public Future<? extends Map<String, String>> getCapabilities() {
-    return new CompletedFuture(this.capabilities, null);
-  }
-
-  @Override
-  public Future<?> addSpec(Spec addedSpec) {
-    provisionedSpecs.put(addedSpec.getUri(), addedSpec);
-    log.info(String.format("Added Spec: %s with Uri: %s for execution on this executor.", addedSpec, addedSpec.getUri()));
-
-    return new CompletedFuture(Boolean.TRUE, null);
-  }
-
-  @Override
-  public Future<?> updateSpec(Spec updatedSpec) {
-    if (!provisionedSpecs.containsKey(updatedSpec.getUri())) {
-      throw new RuntimeException("Spec not found: " + updatedSpec.getUri());
-    }
-    provisionedSpecs.put(updatedSpec.getUri(), updatedSpec);
-    log.info(String.format("Updated Spec: %s with Uri: %s for execution on this executor.", updatedSpec, updatedSpec.getUri()));
-
-    return new CompletedFuture(Boolean.TRUE, null);
-  }
-
-  @Override
-  public Future<?> deleteSpec(URI deletedSpecURI) {
-    if (!provisionedSpecs.containsKey(deletedSpecURI)) {
-      throw new RuntimeException("Spec not found: " + deletedSpecURI);
-    }
-    provisionedSpecs.remove(deletedSpecURI);
-    log.info(String.format("Deleted Spec with Uri: %s from this executor.", deletedSpecURI));
-
-    return new CompletedFuture(Boolean.TRUE, null);
-  }
-
-  @Override
-  public Future<? extends List<Spec>> listSpecs() {
-    return new CompletedFuture<>(Lists.newArrayList(provisionedSpecs.values()), null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
new file mode 100644
index 0000000..80f64ec
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class InMemorySpecProducer implements SpecProducer<Spec>, Serializable {
+  private final Map<URI, Spec> provisionedSpecs;
+  private transient Config config;
+
+  private static final long serialVersionUID = 6106269076155338045L;
+
+  public InMemorySpecProducer(Config config) {
+    this.config = config;
+    this.provisionedSpecs = Maps.newHashMap();
+  }
+
+  @Override
+  public Future<?> addSpec(Spec addedSpec) {
+    provisionedSpecs.put(addedSpec.getUri(), addedSpec);
+    log.info(String.format("Added Spec: %s with Uri: %s for execution on this executor.", addedSpec, addedSpec.getUri()));
+
+    return new CompletedFuture(Boolean.TRUE, null);
+  }
+
+  @Override
+  public Future<?> updateSpec(Spec updatedSpec) {
+    if (!provisionedSpecs.containsKey(updatedSpec.getUri())) {
+      throw new RuntimeException("Spec not found: " + updatedSpec.getUri());
+    }
+    provisionedSpecs.put(updatedSpec.getUri(), updatedSpec);
+    log.info(String.format("Updated Spec: %s with Uri: %s for execution on this executor.", updatedSpec, updatedSpec.getUri()));
+
+    return new CompletedFuture(Boolean.TRUE, null);
+  }
+
+  @Override
+  public Future<?> deleteSpec(URI deletedSpecURI) {
+    if (!provisionedSpecs.containsKey(deletedSpecURI)) {
+      throw new RuntimeException("Spec not found: " + deletedSpecURI);
+    }
+    provisionedSpecs.remove(deletedSpecURI);
+    log.info(String.format("Deleted Spec with Uri: %s from this executor.", deletedSpecURI));
+
+    return new CompletedFuture(Boolean.TRUE, null);
+  }
+
+  @Override
+  public Future<? extends List<Spec>> listSpecs() {
+    return new CompletedFuture<>(Lists.newArrayList(provisionedSpecs.values()), null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
index c42e605..73c1f46 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
@@ -38,13 +38,13 @@ import com.google.gson.GsonBuilder;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 
 
 public class FlowCatalogTest {
@@ -89,7 +89,7 @@ public class FlowCatalogTest {
     properties.put("specExecInstance.capabilities", "source:destination");
     Config config = ConfigUtils.propertiesToConfig(properties);
 
-    SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config);
+    SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config);
 
     FlowSpec.Builder flowSpecBuilder = null;
     try {
@@ -171,4 +171,4 @@ public class FlowCatalogTest {
     URI uri = PathUtils.relativizePath(new Path(SPEC_STORE_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
     return uri;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
index 594c755..48fba40 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,11 +38,10 @@ import com.google.gson.GsonBuilder;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
@@ -89,13 +89,13 @@ public class TopologyCatalogTest {
     properties.put("specExecInstance.capabilities", "source:destination");
     Config config = ConfigUtils.propertiesToConfig(properties);
 
-    SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config);
+    SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config);
 
     TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(computeTopologySpecURI())
         .withConfig(config)
         .withDescription(SPEC_DESCRIPTION)
         .withVersion(SPEC_VERSION)
-        .withSpecExecutorInstanceProducer(specExecutorInstanceProducer);
+        .withSpecExecutor(specExecutorInstanceProducer);
     return topologySpecBuilder.build();
   }
 
@@ -166,4 +166,4 @@ public class TopologyCatalogTest {
     URI uri = PathUtils.relativizePath(new Path(SPEC_STORE_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
     return uri;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 642d818..0b461bd 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -58,6 +58,7 @@ dependencies {
   compile externalDependency.javaxInject
   compile externalDependency.jgit
   compile externalDependency.jodaTime
+  compile externalDependency.jgrapht
   compile externalDependency.kafka08
   compile externalDependency.log4j
   compile externalDependency.lombok

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/HelixUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/HelixUtils.java
deleted file mode 100644
index 4f01623..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/HelixUtils.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.UUID;
-import org.apache.helix.Criteria;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.Message;
-import org.apache.helix.tools.ClusterSetup;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.slf4j.Logger;
-
-
-@Alpha
-public class HelixUtils {
-
-  /***
-   * Build a Helix Manager (Helix Controller instance).
-   *
-   * @param helixInstanceName the Helix Instance name.
-   * @param helixClusterName the Helix Cluster name.
-   * @param zkConnectionString the ZooKeeper connection string.
-   * @return HelixManager
-   */
-  public static HelixManager buildHelixManager(String helixInstanceName, String helixClusterName, String zkConnectionString) {
-    return HelixManagerFactory.getZKHelixManager(helixClusterName, helixInstanceName,
-        InstanceType.CONTROLLER, zkConnectionString);
-  }
-
-  /**
-   * Create a Helix cluster for the Gobblin Cluster application.
-   *
-   * @param zkConnectionString the ZooKeeper connection string
-   * @param clusterName the Helix cluster name
-   */
-  public static void createGobblinHelixCluster(String zkConnectionString, String clusterName) {
-    createGobblinHelixCluster(zkConnectionString, clusterName, true);
-  }
-
-  /**
-   * Create a Helix cluster for the Gobblin Cluster application.
-   *
-   * @param zkConnectionString the ZooKeeper connection string
-   * @param clusterName the Helix cluster name
-   * @param overwrite true to overwrite exiting cluster, false to reuse existing cluster
-   */
-  public static void createGobblinHelixCluster(String zkConnectionString, String clusterName, boolean overwrite) {
-    ClusterSetup clusterSetup = new ClusterSetup(zkConnectionString);
-    // Create the cluster and overwrite if it already exists
-    clusterSetup.addCluster(clusterName, overwrite);
-    // Helix 0.6.x requires a configuration property to have the form key=value.
-    String autoJoinConfig = ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN + "=true";
-    clusterSetup.setConfig(HelixConfigScope.ConfigScopeProperty.CLUSTER, clusterName, autoJoinConfig);
-  }
-
-  /**
-   * Get a Helix instance name.
-   *
-   * @param namePrefix a prefix of Helix instance names
-   * @param instanceId an integer instance ID
-   * @return a Helix instance name that is a concatenation of the given prefix and instance ID
-   */
-  public static String getHelixInstanceName(String namePrefix, int instanceId) {
-    return namePrefix + "_" + instanceId;
-  }
-
-  @VisibleForTesting
-  public static void sendUserDefinedMessage(String messageSubType, String messageVal, String messageId,
-      InstanceType instanceType, HelixManager helixManager, Logger logger) {
-    Criteria criteria = new Criteria();
-    criteria.setInstanceName("%");
-    criteria.setResource("%");
-    criteria.setPartition("%");
-    criteria.setPartitionState("%");
-    criteria.setRecipientInstanceType(instanceType);
-    criteria.setSessionSpecific(true);
-
-    Message message = new Message(Message.MessageType.USER_DEFINE_MSG.toString(), messageId);
-    message.setMsgSubType(messageSubType);
-    message.setAttribute(Message.Attributes.INNER_MESSAGE, messageVal);
-    message.setMsgState(Message.MessageState.NEW);
-    message.setTgtSessionId("*");
-
-    int messagesSent = helixManager.getMessagingService().send(criteria, message);
-    if (messagesSent == 0) {
-      logger.error(String.format("Failed to send the %s message to the participants", message));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
deleted file mode 100644
index 8ea19c4..0000000
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer;
-import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
-import org.apache.gobblin.service.modules.topology.ConfigBasedTopologySpecFactory;
-
-@Alpha
-public class ServiceConfigKeys {
-
-  private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
-
-  // Gobblin Service Manager Keys
-  public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
-  public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
-  public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
-  public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled";
-  public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
-  public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
-  public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
-
-  // Helix / ServiceScheduler Keys
-  public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name";
-  public static final String ZK_CONNECTION_STRING_KEY = GOBBLIN_SERVICE_PREFIX + "zk.connection.string";
-  public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name";
-  public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helixInstanceName";
-  public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX + "flowSpec";
-
-  // Helix message sub types for FlowSpec
-  public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
-  public static final String HELIX_FLOWSPEC_REMOVE = "FLOWSPEC_REMOVE";
-  public static final String HELIX_FLOWSPEC_UPDATE = "FLOWSPEC_UPDATE";
-
-  // Flow Compiler Keys
-  public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
-  public static final String DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS = IdentityFlowToJobSpecCompiler.class.getCanonicalName();
-
-  // Flow specific Keys
-  public static final String FLOW_SOURCE_IDENTIFIER_KEY = "gobblin.flow.sourceIdentifier";
-  public static final String FLOW_DESTINATION_IDENTIFIER_KEY = "gobblin.flow.destinationIdentifier";
-
-  // Command line options
-  public static final String SERVICE_NAME_OPTION_NAME = "service_name";
-
-  // Topology Factory Keys (for overall factory)
-  public static final String TOPOLOGY_FACTORY_PREFIX = "topologySpecFactory.";
-  public static final String DEFAULT_TOPOLOGY_SPEC_FACTORY = ConfigBasedTopologySpecFactory.class.getCanonicalName();
-  public static final String TOPOLOGYSPEC_FACTORY_KEY = TOPOLOGY_FACTORY_PREFIX + "class";
-  public static final String TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY = TOPOLOGY_FACTORY_PREFIX + "topologyNames";
-
-  // Topology Factory Keys (for individual topologies)
-  public static final String TOPOLOGYSPEC_DESCRIPTION_KEY = "description";
-  public static final String TOPOLOGYSPEC_VERSION_KEY = "version";
-  public static final String TOPOLOGYSPEC_URI_KEY = "uri";
-  public static final String DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER = InMemorySpecExecutorInstanceProducer.class.getCanonicalName();
-  public static final String SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY = "specExecutorInstanceProducer.class";
-
-  // Template Catalog Keys
-  public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath";
-
-  // Logging
-  public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties";
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index c2591e1..598371d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -83,7 +83,7 @@ import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowConfigsResource;
-import org.apache.gobblin.service.HelixUtils;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
new file mode 100644
index 0000000..26f4463
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.configuration.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCompiler;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.ServiceMetricNames;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
+
+import lombok.Getter;
+import lombok.Setter;
+
+// Provide base implementation for constructing multi-hops route.
+@Alpha
+public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
+
+  // Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected
+  // to these data structures.
+  @Getter
+  @Setter
+  protected final Map<URI, TopologySpec> topologySpecMap;
+
+
+  /**
+   * Mapping between each FlowEdge and a list of applicable Templates.
+   * Compiler should obtain this Map info from higher level component.
+   * since {@link TopologySpec} doesn't contain Templates.
+   * Key: EdgeIdentifier from {@link org.apache.gobblin.runtime.api.FlowEdge#getEdgeIdentity()}
+   * Value: List of template URI.
+   */
+  // TODO: Define how template info are instantiated. ETL-6217
+  @Getter
+  @Setter
+  protected final Map<String, List<URI>> edgeTemplateMap;
+
+
+  protected final Config config;
+  protected final Logger log;
+  protected final Optional<FSJobCatalog> templateCatalog;
+
+  protected final MetricContext metricContext;
+  @Getter
+  protected Optional<Meter> flowCompilationSuccessFulMeter;
+  @Getter
+  protected Optional<Meter> flowCompilationFailedMeter;
+  @Getter
+  protected Optional<Timer> flowCompilationTimer;
+
+  public BaseFlowToJobSpecCompiler(Config config){
+    this(config,true);
+  }
+
+  public BaseFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled){
+    this(config, Optional.<Logger>absent(),  true);
+  }
+
+  public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log){
+    this(config, log,true);
+  }
+
+  public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){
+    this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
+    if (instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class);
+      this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
+      this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
+      this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
+    }
+    else {
+      this.metricContext = null;
+      this.flowCompilationSuccessFulMeter = Optional.absent();
+      this.flowCompilationFailedMeter = Optional.absent();
+      this.flowCompilationTimer = Optional.absent();
+    }
+
+    this.topologySpecMap = Maps.newConcurrentMap();
+    this.edgeTemplateMap = Maps.newConcurrentMap();
+    this.config = config;
+
+    /***
+     * ETL-5996
+     * For multi-tenancy, the following needs to be added:
+     * 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs
+     * 2. Pick templateCatalog from JobCatalogWithTemplates based on URI, and try to resolve JobSpec using that
+     */
+    try {
+      if (this.config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
+          && StringUtils.isNotBlank(this.config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
+        Config templateCatalogCfg = config
+            .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+                this.config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+        this.templateCatalog = Optional.of(new FSJobCatalog(templateCatalogCfg));
+      } else {
+        this.templateCatalog = Optional.absent();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Could not initialize FlowCompiler because of "
+          + "TemplateCatalog initialization failure", e);
+    }
+  }
+
+  @Override
+  public synchronized void onAddSpec(Spec addedSpec) {
+    topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+  }
+
+  @Override
+  public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+    if (topologySpecMap.containsKey(deletedSpecURI)) {
+      topologySpecMap.remove(deletedSpecURI);
+    }
+  }
+
+  @Override
+  public synchronized void onUpdateSpec(Spec updatedSpec) {
+    topologySpecMap.put(updatedSpec.getUri(), (TopologySpec) updatedSpec);
+  }
+
+  @Nonnull
+  @Override
+  public MetricContext getMetricContext() {
+    return this.metricContext;
+  }
+
+  @Override
+  public boolean isInstrumentationEnabled() {
+    return null != this.metricContext;
+  }
+
+  @Override
+  public List<Tag<?>> generateTags(State state){
+      return Collections.emptyList();
+  }
+
+  @Override
+  public void switchMetricContext(List<Tag<?>> tags) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void switchMetricContext(MetricContext context) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<URI, TopologySpec> getTopologySpecMap() {
+    return this.topologySpecMap;
+  }
+
+  public abstract Map<Spec, SpecExecutor> compileFlow(Spec spec);
+
+  /**
+   * Naive implementation of generating jobSpec, which fetch the first available template,
+   * in an exemplified single-hop FlowCompiler implementation.
+   * @param flowSpec
+   * @return
+   */
+  protected JobSpec jobSpecGenerator(FlowSpec flowSpec) {
+    JobSpec jobSpec;
+    JobSpec.Builder jobSpecBuilder = JobSpec.builder(flowSpec.getUri())
+        .withConfig(flowSpec.getConfig())
+        .withDescription(flowSpec.getDescription())
+        .withVersion(flowSpec.getVersion());
+
+    if (flowSpec.getTemplateURIs().isPresent() && templateCatalog.isPresent()) {
+      // Only first template uri will be honored for Identity
+      jobSpecBuilder = jobSpecBuilder.withTemplate(flowSpec.getTemplateURIs().get().iterator().next());
+      try {
+        jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get());
+        log.info("Resolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
+      } catch (SpecNotFoundException | JobTemplate.TemplateException e) {
+        throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e);
+      }
+    } else {
+      jobSpec = jobSpecBuilder.build();
+      log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
+    }
+
+    // Remove schedule
+    jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
+
+    // Add job.name and job.group
+    if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) {
+      jobSpec.setConfig(jobSpec.getConfig()
+          .withValue(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY)));
+    }
+    if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) {
+      jobSpec.setConfig(jobSpec.getConfig()
+          .withValue(ConfigurationKeys.JOB_GROUP_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY)));
+    }
+
+    // Add flow execution id for this compilation
+    long flowExecutionId = System.currentTimeMillis();
+    jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+        ConfigValueFactory.fromAnyRef(flowExecutionId)));
+
+    // Reset properties in Spec from Config
+    jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
+    return jobSpec;
+  }
+
+  /**
+   * Ideally each edge has its own eligible template repository(Based on {@link SpecExecutor})
+   * to pick templates from.
+   *
+   * This function is to transform from all mixed templates ({@link #templateCatalog})
+   * into categorized {@link #edgeTemplateMap}.
+   *
+   */
+  abstract protected void populateEdgeTemplateMap();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
new file mode 100644
index 0000000..751bb09
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigObject;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static org.apache.gobblin.service.ServiceConfigKeys.*;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class FlowEdgeProps {
+  protected static final boolean DEFAULT_EDGE_SAFETY = true;
+
+  /**
+   * Contains read-only properties that users want to package in.
+   */
+  @Getter
+  protected Config config;
+
+  /**
+   * One of the mutable properties of an edge.
+   */
+  @Getter
+  @Setter
+  private boolean isEdgeSecure;
+
+  public FlowEdgeProps(Config config) {
+    this.config = config;
+    isEdgeSecure = getInitialEdgeSafety();
+  }
+
+  public FlowEdgeProps() {
+    this(ConfigFactory.empty());
+  }
+
+  /**
+   * When initializing an edge, load and security value from properties will be used
+   * but could be overriden afterwards.
+   */
+  private boolean getInitialEdgeSafety() {
+    return
+        config.hasPath(EDGE_SECURITY_KEY) ? config.getBoolean(EDGE_SECURITY_KEY) : DEFAULT_EDGE_SAFETY;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
index 9e5fd11..3fb20a2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
@@ -17,188 +17,82 @@
 
 package org.apache.gobblin.service.modules.flow;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
+
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import lombok.Getter;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecCompiler;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.ServiceMetricNames;
-import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 
 
 /***
  * Take in a logical {@link Spec} ie flow and compile corresponding materialized job {@link Spec}
- * and its mapping to {@link SpecExecutorInstance}.
+ * and its mapping to {@link SpecExecutor}.
  */
 @Alpha
-public class IdentityFlowToJobSpecCompiler implements SpecCompiler {
-
-  private final Map<URI, TopologySpec> topologySpecMap;
-  private final Config config;
-  private final Logger log;
-  private final Optional<FSJobCatalog> templateCatalog;
-
-  protected final MetricContext metricContext;
-  @Getter
-  private Optional<Meter> flowCompilationSuccessFulMeter;
-  @Getter
-  private Optional<Meter> flowCompilationFailedMeter;
-  @Getter
-  private Optional<Timer> flowCompilationTimer;
+public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
 
   public IdentityFlowToJobSpecCompiler(Config config) {
-    this(config, true);
+    super(config, true);
   }
 
   public IdentityFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled) {
-    this(config, Optional.<Logger>absent(), instrumentationEnabled);
+    super(config, Optional.<Logger>absent(), instrumentationEnabled);
   }
 
   public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log) {
-    this(config, log, true);
+    super(config, log, true);
   }
 
   public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
-    this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
-    if (instrumentationEnabled) {
-      this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class);
-      this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
-      this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
-      this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
-    }
-    else {
-      this.metricContext = null;
-      this.flowCompilationSuccessFulMeter = Optional.absent();
-      this.flowCompilationFailedMeter = Optional.absent();
-      this.flowCompilationTimer = Optional.absent();
-    }
-
-    this.topologySpecMap = Maps.newConcurrentMap();
-    this.config = config;
-    /***
-     * For multi-tenancy, the following needs to be added:
-     * 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs
-     * 2. Pick templateCatalog from JobCatalogWithTemplates based on URI, and try to resolve JobSpec using that
-     */
-    try {
-      if (this.config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
-          && StringUtils.isNotBlank(this.config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
-        Config templateCatalogCfg = config
-            .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-                this.config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-        this.templateCatalog = Optional.of(new FSJobCatalog(templateCatalogCfg));
-      } else {
-        this.templateCatalog = Optional.absent();
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("Could not initialize IdentityFlowToJobSpecCompiler because of "
-          + "TemplateCatalog initialization failure", e);
-    }
+    super(config, log, instrumentationEnabled);
   }
 
   @Override
-  public Map<Spec, SpecExecutorInstanceProducer> compileFlow(Spec spec) {
+  public Map<Spec, SpecExecutor> compileFlow(Spec spec) {
     Preconditions.checkNotNull(spec);
     Preconditions.checkArgument(spec instanceof FlowSpec, "IdentityFlowToJobSpecCompiler only converts FlowSpec to JobSpec");
 
     long startTime = System.nanoTime();
-    Map<Spec, SpecExecutorInstanceProducer> specExecutorInstanceMap = Maps.newLinkedHashMap();
+    Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap();
 
     FlowSpec flowSpec = (FlowSpec) spec;
     String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY);
     String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
     log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
 
-    JobSpec jobSpec;
-    JobSpec.Builder jobSpecBuilder = JobSpec.builder(flowSpec.getUri())
-        .withConfig(flowSpec.getConfig())
-        .withDescription(flowSpec.getDescription())
-        .withVersion(flowSpec.getVersion());
-
-    if (flowSpec.getTemplateURIs().isPresent() && templateCatalog.isPresent()) {
-      // Only first template uri will be honored for Identity
-      jobSpecBuilder = jobSpecBuilder.withTemplate(flowSpec.getTemplateURIs().get().iterator().next());
-      try {
-        jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get());
-        log.info("Resolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
-      } catch (SpecNotFoundException | JobTemplate.TemplateException e) {
-        throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e);
-      }
-    } else {
-      jobSpec = jobSpecBuilder.build();
-      log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
-    }
-
-    // Remove schedule
-    jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
-
-    // Add job.name and job.group
-    if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) {
-      jobSpec.setConfig(jobSpec.getConfig()
-          .withValue(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY)));
-    }
-    if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) {
-      jobSpec.setConfig(jobSpec.getConfig()
-          .withValue(ConfigurationKeys.JOB_GROUP_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY)));
-    }
-
-    // Add flow execution id for this compilation
-    long flowExecutionId = System.currentTimeMillis();
-    jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
-        ConfigValueFactory.fromAnyRef(flowExecutionId)));
-
-    // Reset properties in Spec from Config
-    jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
+    JobSpec jobSpec = jobSpecGenerator(flowSpec);
 
     for (TopologySpec topologySpec : topologySpecMap.values()) {
       try {
-        Map<String, String> capabilities = (Map<String, String>) topologySpec.getSpecExecutorInstanceProducer().getCapabilities().get();
-        for (Map.Entry<String, String> capability : capabilities.entrySet()) {
+        Map<ServiceNode, ServiceNode> capabilities = (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get();
+        for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) {
           log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with "
-              + "capability of source: %s and destination: %s ", jobSpec.getUri(),
+                  + "capability of source: %s and destination: %s ", jobSpec.getUri(),
               topologySpec.getUri(), capability.getKey(), capability.getValue()));
-          if (source.equals(capability.getKey()) && destination.equals(capability.getValue())) {
-            specExecutorInstanceMap.put(jobSpec, topologySpec.getSpecExecutorInstanceProducer());
+          if (source.equals(capability.getKey().getNodeName()) && destination.equals(capability.getValue().getNodeName())) {
+            specExecutorMap.put(jobSpec, topologySpec.getSpecExecutor());
             log.info(String.format("Current JobSpec: %s is executable on TopologySpec: %s. Added TopologySpec as candidate.",
                 jobSpec.getUri(), topologySpec.getUri()));
 
             log.info("Since we found a candidate executor, we will not try to compute more. "
                 + "(Intended limitation for IdentityFlowToJobSpecCompiler)");
-            return specExecutorInstanceMap;
+            return specExecutorMap;
           }
         }
       } catch (InterruptedException | ExecutionException e) {
@@ -209,52 +103,12 @@ public class IdentityFlowToJobSpecCompiler implements SpecCompiler {
     Instrumented.markMeter(this.flowCompilationSuccessFulMeter);
     Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
 
-    return specExecutorInstanceMap;
-  }
-
-  @Override
-  public Map<URI, TopologySpec> getTopologySpecMap() {
-    return this.topologySpecMap;
-  }
-
-  @Override
-  public void onAddSpec(Spec addedSpec) {
-    topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
-  }
-
-  @Override
-  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
-    topologySpecMap.remove(deletedSpecURI);
-  }
-
-  @Override
-  public void onUpdateSpec(Spec updatedSpec) {
-    topologySpecMap.put(updatedSpec.getUri(), (TopologySpec) updatedSpec);
-  }
-
-  @Nonnull
-  @Override
-  public MetricContext getMetricContext() {
-    return this.metricContext;
-  }
-
-  @Override
-  public boolean isInstrumentationEnabled() {
-    return null != this.metricContext;
-  }
-
-  @Override
-  public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void switchMetricContext(List<Tag<?>> tags) {
-    throw new UnsupportedOperationException();
+    return specExecutorMap;
   }
 
   @Override
-  public void switchMetricContext(MetricContext context) {
-    throw new UnsupportedOperationException();
+  protected void populateEdgeTemplateMap() {
+    log.warn("No population of templates based on edge happen in this implementation");
+    return;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
new file mode 100644
index 0000000..83b94e3
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import com.typesafe.config.Config;
+
+import org.jgrapht.graph.DefaultWeightedEdge;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.FlowEdge;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+
+import lombok.Getter;
+
+/**
+ * A base implementation of a flowEdge in the weight multi-edge graph.
+ * For a weightedMultiGraph there could be multiple edges between two vertices.
+ * Recall that a triplet of <SourceNode, targetNode, specExecutor> determines one edge.
+ * It is expected that {@link org.jgrapht.graph.DirectedWeightedMultigraph#getAllEdges(Object, Object)}
+ * can return multiple edges with the same pair of source and destination but different SpecExecutor.
+ *
+ * Each edge has a {@FlowEdgeProp} which contains mutable and immutable properties.
+ * The {@link LoadBasedFlowEdgeImpl} exposes two mutable properties: Load and Security.
+ *
+ * Load of an edge is equivalent to weight defined in {@link DefaultWeightedEdge}.
+ * Since {@link #getWeight()} method is protected, {@link #getEdgeLoad()} will return the load.
+ * There's no setLoad, which is logically supposed to happen by invoking
+ * {@link org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)}.
+ *
+ * Security of an edge describes if an edge is secure to be part of data movement path at current stage.
+ *
+ */
+@Alpha
+public class LoadBasedFlowEdgeImpl extends DefaultWeightedEdge implements FlowEdge {
+
+  /**
+   * In our cases {@link LoadBasedFlowEdgeImpl} is not likely to be serialized.
+   * While as it extends {@link DefaultWeightedEdge} for best practice we made all fields transient,
+   * and specify serialVersionUID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  @Getter
+  private transient ServiceNode sourceNode;
+  @Getter
+  private transient ServiceNode targetNode;
+  @Getter
+  private transient SpecExecutor specExecutorInstance;
+
+  /**
+   * Contains both read-only and mutable attributes of properties of an edge.
+   * Mutable properties in{@link FlowEdgeProps} expose their Setter & Getter
+   * thru. either the {@link FlowEdgeProps}
+   * or graph-level api, e.g. {@link org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)}
+   *
+   * Typical mutable properties of an edge includes:
+   * Load(Weight), Security.
+   */
+  private final transient FlowEdgeProps flowEdgeProps;
+
+  public LoadBasedFlowEdgeImpl(ServiceNode sourceNode, ServiceNode targetNode,
+      FlowEdgeProps flowEdgeProps, SpecExecutor specExecutorInstance) {
+    this.sourceNode = sourceNode;
+    this.targetNode = targetNode;
+    this.flowEdgeProps = flowEdgeProps;
+    this.specExecutorInstance = specExecutorInstance;
+  }
+
+  public LoadBasedFlowEdgeImpl(ServiceNode sourceNode, ServiceNode targetNode,
+      SpecExecutor specExecutor) {
+    this(sourceNode, targetNode, new FlowEdgeProps(specExecutor.getAttrs()),
+        specExecutor);
+  }
+
+  // Load: Directly using {@link DefaultWeightedEdge}'s weight field.
+  /**
+   * Load:
+   * Initialization: super's default constructor
+   * Getter: {@link #getEdgeLoad()}} thru. {@link DefaultWeightedEdge}'s {@link #getWeight()}.
+   * Setter:Thru. {@link org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)}
+   */
+  public double getEdgeLoad() {
+    return getWeight();
+  }
+
+  // Security: Get/Set thru. FlowEdgeProps
+  /**
+   * Initialization\Getter\Setter: By {@link FlowEdgeProps}
+   */
+  public boolean getIsEdgeSecure() {
+    return flowEdgeProps.isEdgeSecure();
+  }
+  public void setIsEdgeSecure(boolean isEdgeSecure) {
+    this.flowEdgeProps.setEdgeSecure(isEdgeSecure);
+  }
+
+
+  @Override
+  public String getEdgeIdentity() {
+    return this.calculateEdgeIdentity(this.sourceNode, this.targetNode, this.specExecutorInstance);
+  }
+
+  @Override
+  public Config getEdgeProperties() {
+    return this.flowEdgeProps.getConfig();
+  }
+
+  @Override
+  /**
+   * Naive rule: If edge is secure, then it is qualified to be considered in path-finding.
+   */
+  public boolean isEdgeEnabled() {
+    return this.flowEdgeProps.isEdgeSecure();
+  }
+
+
+  /**
+   * A naive implementation of edge identity calculation.
+   * @return
+   */
+  public static String calculateEdgeIdentity(ServiceNode sourceNode, ServiceNode targetNode, SpecExecutor specExecutorInstance){
+    return sourceNode.getNodeName() + "-" + specExecutorInstance.getUri() + "-" + targetNode.getNodeName();
+  }
+
+  /**
+   * Recall that we need a triplet to uniquely define a {@link FlowEdge}:
+   * - {@link ServiceNode} sourceNode
+   * - {@link ServiceNode} targetNode
+   * - {@link SpecExecutor} SpecExecutor
+   *
+   * We DO NOT distinguish between two edges by other props like weight,
+   * as the load should be an attribute of an edge.
+   * These are IntelliJ-generated methods for equals and hashCode().
+   *
+   * @param o The object that being compared
+   * @return If two {@link LoadBasedFlowEdgeImpl} are equivalent.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    LoadBasedFlowEdgeImpl that = (LoadBasedFlowEdgeImpl) o;
+
+    if (!sourceNode.equals(that.sourceNode)) {
+      return false;
+    }
+    if (!targetNode.equals(that.targetNode)) {
+      return false;
+    }
+    return specExecutorInstance.equals(that.specExecutorInstance);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = sourceNode.hashCode();
+    result = 31 * result + targetNode.hashCode();
+    result = 31 * result + specExecutorInstance.hashCode();
+    return result;
+  }
+}
\ No newline at end of file


Mime
View raw message