gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [4/4] incubator-gobblin git commit: [GOBBLIN-3] Multi-hop flow compiler implementation
Date Tue, 12 Sep 2017 09:30:04 GMT
[GOBBLIN-3] Multi-hop flow compiler implementation

Closes #2078 from autumnust/flowcompiler


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9402a903
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9402a903
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9402a903

Branch: refs/heads/master
Commit: 9402a9037554bcae4cc958a69a85eb4a16e8c179
Parents: ea5047e
Author: Lei Sun <autumnust@gmail.com>
Authored: Tue Sep 12 02:29:05 2017 -0700
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Tue Sep 12 02:29:21 2017 -0700

----------------------------------------------------------------------
 conf/service/application.conf                   |   2 +-
 .../apache/gobblin/runtime/api/FlowEdge.java    |  51 +++
 .../apache/gobblin/runtime/api/ServiceNode.java |  43 +++
 .../gobblin/runtime/api/SpecConsumer.java       |  35 ++
 .../gobblin/runtime/api/SpecExecutor.java       |  79 +++++
 .../runtime/api/SpecExecutorInstance.java       |  71 ----
 .../api/SpecExecutorInstanceConsumer.java       |  30 --
 .../api/SpecExecutorInstanceProducer.java       |  44 ---
 .../gobblin/runtime/api/SpecProducer.java       |  46 +++
 .../gobblin/service/ServiceConfigKeys.java      | 109 +++++++
 .../GobblinClusterConfigurationKeys.java        |  12 +-
 .../ScheduledJobConfigurationManager.java       |  49 ++-
 .../StreamingJobConfigurationManager.java       |  55 ++--
 .../orchestration/AzkabanSpecExecutor.java      |  76 +++++
 .../AzkabanSpecExecutorInstance.java            | 108 ------
 .../AzkabanSpecExecutorInstanceProducer.java    | 176 ----------
 .../orchestration/AzkabanSpecProducer.java      | 176 ++++++++++
 .../service/SimpleKafkaSpecConsumer.java        | 264 +++++++++++++++
 .../service/SimpleKafkaSpecExecutor.java        | 105 ++++++
 .../SimpleKafkaSpecExecutorInstance.java        | 131 --------
 ...SimpleKafkaSpecExecutorInstanceConsumer.java | 261 ---------------
 ...SimpleKafkaSpecExecutorInstanceProducer.java | 139 --------
 .../service/SimpleKafkaSpecProducer.java        | 140 ++++++++
 .../service/StreamingKafkaSpecConsumer.java     | 173 ++++++++++
 ...eamingKafkaSpecExecutorInstanceConsumer.java | 171 ----------
 .../SimpleKafkaSpecExecutorInstanceTest.java    | 180 ----------
 .../service/SimpleKafkaSpecExecutorTest.java    | 180 ++++++++++
 .../StreamingKafkaSpecExecutorInstanceTest.java | 192 -----------
 .../service/StreamingKafkaSpecExecutorTest.java | 191 +++++++++++
 .../gobblin/runtime/api/SpecCompiler.java       |  10 +-
 .../gobblin/runtime/api/TopologySpec.java       |  66 ++--
 .../job_monitor/AvroJobSpecKafkaJobMonitor.java |  20 +-
 .../AbstractSpecExecutor.java                   | 188 +++++++++++
 .../BaseServiceNodeImpl.java                    | 100 ++++++
 .../InMemorySpecExecutor.java                   |  93 ++++++
 .../InMemorySpecExecutorInstanceProducer.java   | 147 ---------
 .../InMemorySpecProducer.java                   |  82 +++++
 .../gobblin/spec_catalog/FlowCatalogTest.java   |   8 +-
 .../spec_catalog/TopologyCatalogTest.java       |  10 +-
 gobblin-service/build.gradle                    |   1 +
 .../org/apache/gobblin/service/HelixUtils.java  | 110 -------
 .../gobblin/service/ServiceConfigKeys.java      |  80 -----
 .../modules/core/GobblinServiceManager.java     |   2 +-
 .../modules/flow/BaseFlowToJobSpecCompiler.java | 259 +++++++++++++++
 .../service/modules/flow/FlowEdgeProps.java     |  67 ++++
 .../flow/IdentityFlowToJobSpecCompiler.java     | 192 ++---------
 .../modules/flow/LoadBasedFlowEdgeImpl.java     | 180 ++++++++++
 .../flow/MultiHopsFlowToJobSpecCompiler.java    | 313 ++++++++++++++++++
 .../modules/orchestration/Orchestrator.java     |  27 +-
 .../service/modules/policy/ServicePolicy.java   |  51 +++
 .../modules/policy/StaticServicePolicy.java     |  98 ++++++
 .../scheduler/GobblinServiceJobScheduler.java   |   2 +-
 .../ConfigBasedTopologySpecFactory.java         |  27 +-
 .../service/modules/utils/DistancedNode.java    |  77 +++++
 .../service/modules/utils/FindPathUtils.java    | 109 +++++++
 .../service/modules/utils/HelixUtils.java       | 110 +++++++
 .../modules/core/GobblinServiceHATest.java      |   9 +-
 .../modules/core/GobblinServiceManagerTest.java |   6 +-
 .../core/IdentityFlowToJobSpecCompilerTest.java |  21 +-
 .../MultiHopsFlowToJobSpecCompilerTest.java     | 326 +++++++++++++++++++
 .../modules/orchestration/OrchestratorTest.java |  31 +-
 .../ConfigBasedTopologySpecFactoryTest.java     |  10 +-
 gradle/scripts/dependencyDefinitions.gradle     |   1 +
 63 files changed, 3921 insertions(+), 2201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/conf/service/application.conf
----------------------------------------------------------------------
diff --git a/conf/service/application.conf b/conf/service/application.conf
index 3e292a0..3cb5b34 100644
--- a/conf/service/application.conf
+++ b/conf/service/application.conf
@@ -25,7 +25,7 @@ topologySpecFactory.topologyNames=localGobblinCluster
 topologySpecFactory.localGobblinCluster.description="StandaloneClusterTopology"
 topologySpecFactory.localGobblinCluster.version="1"
 topologySpecFactory.localGobblinCluster.uri="gobblinCluster"
-topologySpecFactory.localGobblinCluster.specExecutorInstanceProducer.class="org.apache.gobblin.service.SimpleKafkaSpecExecutorInstanceProducer"
+topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.service.SimpleKafkaSpecProducer"
 topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="externalSource:InternalSink"
 topologySpecFactory.localGobblinCluster.writer.kafka.topics="SimpleKafkaSpecExecutorInstanceTest"
 topologySpecFactory.localGobblinCluster.writer.kafka.producerConfig.bootstrap.servers="localhost:9092"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/FlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/FlowEdge.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/FlowEdge.java
new file mode 100644
index 0000000..9dc6413
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/FlowEdge.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * A typical edge consists of two types of attributes:
+ * - Numerical value based: Return an numerical value for evaluation.
+ * - Boolean value based: Return either true or false.
+ */
+public interface FlowEdge {
+
+  /**
+   * @return Uniqueness of an edge is defined by
+   * - sourceNode
+   * - targetNode
+   * - SpecExecutor
+   * hashCode and equals is required to implemented accordingly.
+   */
+  String getEdgeIdentity();
+
+  /**
+   * Return read-only Edge Properties .
+   * @return
+   */
+  Config getEdgeProperties();
+
+  /**
+   * @return If a edge should be considered as part of flow spec compilation result,
+   * based on all boolean-based properties like safety.
+   */
+  boolean isEdgeEnabled();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/ServiceNode.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/ServiceNode.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/ServiceNode.java
new file mode 100644
index 0000000..eeb74c4
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/ServiceNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import com.typesafe.config.Config;
+
+/**
+ * Abstraction of a Node in {@link SpecExecutor}
+ * 'Service' here refers to 'Service' in GaaS and it is not necessary related to a Service interface.
+ */
+public interface ServiceNode {
+  /**
+   * @return The name of node.
+   * It should be the identifier of a {@link ServiceNode}.
+   */
+  String getNodeName();
+
+  /**
+   * @return The attributes of a {@link ServiceNode}.
+   */
+  Config getNodeProps();
+
+  /**
+   * @return if the node is valid to use
+   */
+  boolean isNodeEnabled();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
new file mode 100644
index 0000000..6dc16f4
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import org.apache.commons.lang3.tuple.Pair;
+
+
+/**
+ * A communication socket (receiving side for this class)
+ * for each {@link SpecExecutor} to receive spec to execute from Orchestrator.
+ * Implementation of this interface should specify communication channel (e.g. Kafka, REST, etc.)
+ */
+public interface SpecConsumer<V>  {
+
+  /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}. */
+  Future<? extends List<Pair<SpecExecutor.Verb, V>>> changedSpecs();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
new file mode 100644
index 0000000..cb5197a
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * Defines a representation of JobSpec-Executor in GaaS.
+ * A triplet of <Technology, location, communication mechanism> uniquely defines an object of SpecExecutor.
+ * e.g. <Lumos, Holdem, Rest> represents a Executor that moves data by Lumos, running on Holdem can be reached by Rest.
+ */
+public interface SpecExecutor {
+  /** An URI identifying the SpecExecutor. */
+  URI getUri();
+
+  /** Human-readable description of the SpecExecutor .*/
+  Future<String> getDescription();
+
+  /** SpecExecutor config as a typesafe config object. */
+  Future<Config> getConfig();
+
+  /** SpecExecutor attributes include Location of SpecExecutor and the Type of it (Technology it used for data movement,
+   * like, gobblin-standalone/gobblin-cluster
+   * SpecExecutor attributes are supposed to be read-only once instantiated.
+   * */
+  Config getAttrs();
+
+  /** Health of SpecExecutor. */
+  Future<String> getHealth();
+
+  /** Source : Destination processing capabilities of SpecExecutor. */
+  Future<? extends Map<ServiceNode, ServiceNode>> getCapabilities();
+
+  /** A communication socket for generating spec to assigned physical executors, paired with
+   * a consumer on the physical executor side. */
+  Future<? extends SpecProducer> getProducer();
+
+  public static enum Verb {
+    ADD(1, "add"),
+    UPDATE(2, "update"),
+    DELETE(3, "delete");
+
+    private int _id;
+    private String _verb;
+
+    Verb(int id, String verb) {
+      _id = id;
+      _verb = verb;
+    }
+
+    public int getId() {
+      return _id;
+    }
+
+    public String getVerb() {
+      return _verb;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstance.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstance.java
deleted file mode 100644
index 47569f3..0000000
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstance.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.api;
-
-import java.net.URI;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.annotation.Alpha;
-
-
-/**
- * Defines a SpecExecutorInstance (typically a standalone instance, cluster or Azkaban deployment)
- * that can execute a {@link Spec}.
- */
-@Alpha
-public interface SpecExecutorInstance {
-  /** An URI identifying the SpecExecutorInstance. */
-  URI getUri();
-
-  /** Human-readable description of the SpecExecutorInstance .*/
-  Future<String> getDescription();
-
-  /** SpecExecutorInstance config as a typesafe config object. */
-  Future<Config> getConfig();
-
-  /** Health of SpecExecutorInstance. */
-  Future<String> getHealth();
-
-  /** Source : Destination processing capabilities of SpecExecutorInstance. */
-  Future<? extends Map<String, String>> getCapabilities();
-
-  public static enum Verb {
-    ADD(1, "add"),
-    UPDATE(2, "update"),
-    DELETE(3, "delete");
-
-    private int _id;
-    private String _verb;
-
-    Verb(int id, String verb) {
-      _id = id;
-      _verb = verb;
-    }
-
-    public int getId() {
-      return _id;
-    }
-
-    public String getVerb() {
-      return _verb;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceConsumer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceConsumer.java
deleted file mode 100644
index 475c5af..0000000
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceConsumer.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.api;
-
-import java.util.List;
-import java.util.concurrent.Future;
-import org.apache.commons.lang3.tuple.Pair;
-
-
-public interface SpecExecutorInstanceConsumer<V> extends SpecExecutorInstance {
-
-  /** List of newly changed {@link Spec}s for execution on {@link SpecExecutorInstance}. */
-  Future<? extends List<Pair<Verb, V>>> changedSpecs();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceProducer.java
deleted file mode 100644
index 12508da..0000000
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceProducer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.runtime.api;
-
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.Future;
-
-import org.apache.gobblin.annotation.Alpha;
-
-
-/**
- * Defines a SpecExecutorInstanceProducer to produce jobs to {@link SpecExecutorInstance}
- * that can execute a {@link Spec}.
- */
-@Alpha
-public interface SpecExecutorInstanceProducer<V> extends SpecExecutorInstance {
-  /** Add a {@link Spec} for execution on {@link SpecExecutorInstance}. */
-  Future<?> addSpec(V addedSpec);
-
-  /** Update a {@link Spec} being executed on {@link SpecExecutorInstance}. */
-  Future<?> updateSpec(V updatedSpec);
-
-  /** Delete a {@link Spec} being executed on {@link SpecExecutorInstance}. */
-  Future<?> deleteSpec(URI deletedSpecURI);
-
-  /** List all {@link Spec} being executed on {@link SpecExecutorInstance}. */
-  Future<? extends List<V>> listSpecs();
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
new file mode 100644
index 0000000..9b9e504
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * Defines a SpecProducer to produce jobs to {@link SpecExecutor}
+ * that can execute a {@link Spec}.
+ *
+ * A handle on the Orchestrator side to send {@link Spec}s.
+ */
+@Alpha
+public interface SpecProducer<V> {
+  /** Add a {@link Spec} for execution on {@link SpecExecutor}. */
+  Future<?> addSpec(V addedSpec);
+
+  /** Update a {@link Spec} being executed on {@link SpecExecutor}. */
+  Future<?> updateSpec(V updatedSpec);
+
+  /** Delete a {@link Spec} being executed on {@link SpecExecutor}. */
+  Future<?> deleteSpec(URI deletedSpecURI);
+
+  /** List all {@link Spec} being executed on {@link SpecExecutor}. */
+  Future<? extends List<V>> listSpecs();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
new file mode 100644
index 0000000..7231e0c
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import org.apache.gobblin.annotation.Alpha;
+
+@Alpha
+public class ServiceConfigKeys {
+
+  private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
+
+  // Gobblin Service Manager Keys
+  public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
+  public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
+  public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
+  public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled";
+  public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
+  public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
+  public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
+
+  // Helix / ServiceScheduler Keys
+  public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name";
+  public static final String ZK_CONNECTION_STRING_KEY = GOBBLIN_SERVICE_PREFIX + "zk.connection.string";
+  public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name";
+  public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helixInstanceName";
+  public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX + "flowSpec";
+
+  // Helix message sub types for FlowSpec
+  public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
+  public static final String HELIX_FLOWSPEC_REMOVE = "FLOWSPEC_REMOVE";
+  public static final String HELIX_FLOWSPEC_UPDATE = "FLOWSPEC_UPDATE";
+
+  // Flow Compiler Keys
+  public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
+  /**
+   * Directly use canonical class name here to avoid introducing additional dependency here.
+   */
+  public static final String DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS =
+      "org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler";
+
+  // Flow specific Keys
+  public static final String FLOW_SOURCE_IDENTIFIER_KEY = "gobblin.flow.sourceIdentifier";
+  public static final String FLOW_DESTINATION_IDENTIFIER_KEY = "gobblin.flow.destinationIdentifier";
+
+  // Command line options
+  public static final String SERVICE_NAME_OPTION_NAME = "service_name";
+
+  // Topology Factory Keys (for overall factory)
+  public static final String TOPOLOGY_FACTORY_PREFIX = "topologySpecFactory.";
+  public static final String DEFAULT_TOPOLOGY_SPEC_FACTORY =
+      "org.apache.gobblin.service.modules.topology.ConfigBasedTopologySpecFactory";
+  public static final String TOPOLOGYSPEC_FACTORY_KEY = TOPOLOGY_FACTORY_PREFIX + "class";
+  public static final String TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY = TOPOLOGY_FACTORY_PREFIX + "topologyNames";
+
+  // Topology Factory Keys (for individual topologies)
+  public static final String TOPOLOGYSPEC_DESCRIPTION_KEY = "description";
+  public static final String TOPOLOGYSPEC_VERSION_KEY = "version";
+  public static final String TOPOLOGYSPEC_URI_KEY = "uri";
+
+  public static final String DEFAULT_SPEC_EXECUTOR =
+      "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor";
+  public static final String SPEC_EXECUTOR_KEY = "specExecutorInstance.class";
+  public static final String EDGE_SECURITY_KEY = "edge.secured";
+
+
+  // Template Catalog Keys
+  public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath";
+
+  // Keys related to user-specified policy on route selection.
+  // Undesired connection to form an executable JobSpec.
+  // Formatted as a String list, each entry contains a string in the format of "Source1:Sink1:URI",
+  // which indicates that data movement from source1 to sink1 with specific URI of specExecutor should be avoided.
+  public static final String POLICY_BASED_BLOCKED_CONNECTION = GOBBLIN_SERVICE_PREFIX + "blockedConnections";
+
+  // Comma separated list of nodes that is blacklisted. Names put here will become the nodeName which is the ID of a serviceNode.
+  public static final String POLICY_BASED_BLOCKED_NODES = GOBBLIN_SERVICE_PREFIX + "blockedNodes";
+  // Complete path of how the data movement is executed from source to sink.
+  // Formatted as a String, each hop separated by comma, from source to sink in order.
+  public static final String POLICY_BASED_DATA_MOVEMENT_PATH = GOBBLIN_SERVICE_PREFIX + "fullDataPath";
+
+  public static final String ATTRS_PATH_IN_CONFIG = "executorAttrs";
+
+  // Gobblin Service Graph Representation Topology related Keys
+  public static final String NODE_SECURITY_KEY = "node.secured";
+  // True means node is by default secure.
+  public static final String DEFAULT_NODE_SECURITY = "true";
+
+
+  // Policy related configuration Keys
+  public static final String DEFAULT_SERVICE_POLICY = "static";
+  public static final String SERVICE_POLICY_NAME = GOBBLIN_SERVICE_PREFIX + "servicePolicy";
+  // Logging
+  public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties";
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index e8dfb1d..ea75dc3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -68,11 +68,11 @@ public class GobblinClusterConfigurationKeys {
   public static final String JOB_CONFIGURATION_MANAGER_KEY = GOBBLIN_CLUSTER_PREFIX + "job.configuration.manager";
 
   public static final String JOB_SPEC_REFRESH_INTERVAL = GOBBLIN_CLUSTER_PREFIX + "job.spec.refresh.interval";
-  public static final String SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS_KEY = GOBBLIN_CLUSTER_PREFIX + "specConsumer.class";
-  public static final String DEFAULT_SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS =
-      "org.apache.gobblin.service.SimpleKafkaSpecExecutorInstanceConsumer";
-  public static final String DEFAULT_STREAMING_SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS =
-      "org.apache.gobblin.service.StreamingKafkaSpecExecutorInstanceConsumer";
+  public static final String SPEC_CONSUMER_CLASS_KEY = GOBBLIN_CLUSTER_PREFIX + "specConsumer.class";
+  public static final String DEFAULT_SPEC_CONSUMER_CLASS =
+      "org.apache.gobblin.service.SimpleKafkaSpecConsumer";
+  public static final String DEFAULT_STREAMING_SPEC_CONSUMER_CLASS =
+      "org.apache.gobblin.service.StreamingKafkaSpecConsumer";
   public static final String JOB_CATALOG_KEY = GOBBLIN_CLUSTER_PREFIX + "job.catalog";
   public static final String DEFAULT_JOB_CATALOG =
       "org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog";
@@ -80,4 +80,4 @@ public class GobblinClusterConfigurationKeys {
   public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds";
   public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60;
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
index 9290b5a..0f2d356 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
@@ -27,24 +27,23 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceConsumer;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 
 
 @Alpha
@@ -59,9 +58,9 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager {
 
   private final ScheduledExecutorService fetchJobSpecExecutor;
 
-  private final SpecExecutorInstanceConsumer specExecutorInstanceConsumer;
+  private final SpecConsumer _specConsumer;
 
-  private final ClassAliasResolver<SpecExecutorInstanceConsumer> aliasResolver;
+  private final ClassAliasResolver<SpecConsumer> aliasResolver;
 
   public ScheduledJobConfigurationManager(EventBus eventBus, Config config) {
     super(eventBus, config);
@@ -73,17 +72,17 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager {
     this.fetchJobSpecExecutor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobSpecExecutor")));
 
-    this.aliasResolver = new ClassAliasResolver<>(SpecExecutorInstanceConsumer.class);
+    this.aliasResolver = new ClassAliasResolver<>(SpecConsumer.class);
     try {
-      String specExecutorInstanceConsumerClassName = GobblinClusterConfigurationKeys.DEFAULT_SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS;
-      if (config.hasPath(GobblinClusterConfigurationKeys.SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS_KEY)) {
-        specExecutorInstanceConsumerClassName = config.getString(GobblinClusterConfigurationKeys.SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS_KEY);
+      String specConsumerClassName = GobblinClusterConfigurationKeys.DEFAULT_SPEC_CONSUMER_CLASS;
+      if (config.hasPath(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY)) {
+        specConsumerClassName = config.getString(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY);
       }
-      LOGGER.info("Using SpecExecutorInstanceConsumer ClassNameclass name/alias " + specExecutorInstanceConsumerClassName);
-      this.specExecutorInstanceConsumer = (SpecExecutorInstanceConsumer) ConstructorUtils
-          .invokeConstructor(Class.forName(this.aliasResolver.resolve( specExecutorInstanceConsumerClassName)), config);
+      LOGGER.info("Using SpecConsumer ClassNameclass name/alias " + specConsumerClassName);
+      this._specConsumer = (SpecConsumer) ConstructorUtils
+          .invokeConstructor(Class.forName(this.aliasResolver.resolve(specConsumerClassName)), config);
     } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
-          | ClassNotFoundException e) {
+        | ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
   }
@@ -116,25 +115,25 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager {
    * @throws InterruptedException
    */
   private void fetchJobSpecs() throws ExecutionException, InterruptedException {
-    List<Pair<SpecExecutorInstance.Verb, Spec>> changesSpecs =
-        (List<Pair<SpecExecutorInstance.Verb, Spec>>) this.specExecutorInstanceConsumer.changedSpecs().get();
+    List<Pair<SpecExecutor.Verb, Spec>> changesSpecs =
+        (List<Pair<SpecExecutor.Verb, Spec>>) this._specConsumer.changedSpecs().get();
 
-    for (Pair<SpecExecutorInstance.Verb, Spec> entry : changesSpecs) {
+    for (Pair<SpecExecutor.Verb, Spec> entry : changesSpecs) {
 
-      SpecExecutorInstance.Verb verb = entry.getKey();
-      if (verb.equals(SpecExecutorInstance.Verb.ADD)) {
+      SpecExecutor.Verb verb = entry.getKey();
+      if (verb.equals(SpecExecutor.Verb.ADD)) {
 
         // Handle addition
         JobSpec jobSpec = (JobSpec) entry.getValue();
         postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
         jobSpecs.put(entry.getValue().getUri(), (JobSpec) entry.getValue());
-      } else if (verb.equals(SpecExecutorInstanceConsumer.Verb.UPDATE)) {
+      } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
 
         // Handle update
         JobSpec jobSpec = (JobSpec) entry.getValue();
         postUpdateJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
         jobSpecs.put(entry.getValue().getUri(), (JobSpec) entry.getValue());
-      } else if (verb.equals(SpecExecutorInstanceConsumer.Verb.DELETE)) {
+      } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
 
         // Handle delete
         Spec anonymousSpec = (Spec) entry.getValue();
@@ -148,4 +147,4 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager {
   protected void shutDown() throws Exception {
     ExecutorsUtils.shutdownExecutorService(this.fetchJobSpecExecutor, Optional.of(LOGGER));
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
index e660710..7370dc6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
@@ -25,30 +25,29 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.runtime.api.SpecConsumer;
 
 
 /**
- * A {@link JobConfigurationManager} that fetches job specs from a {@link SpecExecutorInstanceConsumer} in a loop
+ * A {@link JobConfigurationManager} that fetches job specs from a {@link SpecConsumer} in a loop
  * without
  */
 @Alpha
@@ -57,7 +56,7 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager {
 
   private final ExecutorService fetchJobSpecExecutor;
 
-  private final SpecExecutorInstanceConsumer specExecutorInstanceConsumer;
+  private final SpecConsumer specConsumer;
 
   private final long stopTimeoutSeconds;
 
@@ -71,23 +70,23 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager {
         ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobSpecExecutor")));
 
     String specExecutorInstanceConsumerClassName =
-        ConfigUtils.getString(config, GobblinClusterConfigurationKeys.SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS_KEY,
-            GobblinClusterConfigurationKeys.DEFAULT_STREAMING_SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS);
+        ConfigUtils.getString(config, GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY,
+            GobblinClusterConfigurationKeys.DEFAULT_STREAMING_SPEC_CONSUMER_CLASS);
 
-    LOGGER.info("Using SpecExecutorInstanceConsumer ClassNameclass name/alias " +
+    LOGGER.info("Using SpecConsumer ClassNameclass name/alias " +
         specExecutorInstanceConsumerClassName);
 
     try {
-      ClassAliasResolver<SpecExecutorInstanceConsumer> aliasResolver =
-          new ClassAliasResolver<>(SpecExecutorInstanceConsumer.class);
+      ClassAliasResolver<SpecConsumer> aliasResolver =
+          new ClassAliasResolver<>(SpecConsumer.class);
 
-      this.specExecutorInstanceConsumer = (SpecExecutorInstanceConsumer) GobblinConstructorUtils.invokeFirstConstructor(
+      this.specConsumer = (SpecConsumer) GobblinConstructorUtils.invokeFirstConstructor(
           Class.forName(aliasResolver.resolve(specExecutorInstanceConsumerClassName)),
           ImmutableList.<Object>of(config, jobCatalog),
           ImmutableList.<Object>of(config));
     } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
-          | ClassNotFoundException e) {
-      throw new RuntimeException("Could not construct SpecExecutorInstanceConsumer " +
+        | ClassNotFoundException e) {
+      throw new RuntimeException("Could not construct SpecConsumer " +
           specExecutorInstanceConsumerClassName, e);
     }
   }
@@ -97,8 +96,8 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager {
     LOGGER.info("Starting the " + StreamingJobConfigurationManager.class.getSimpleName());
 
     // if the instance consumer is a service then need to start it to consume job specs
-    if (this.specExecutorInstanceConsumer instanceof Service) {
-      ((Service) this.specExecutorInstanceConsumer).startAsync().awaitRunning();
+    if (this.specConsumer instanceof Service) {
+      ((Service) this.specConsumer).startAsync().awaitRunning();
     }
 
     // submit command to fetch job specs
@@ -120,25 +119,25 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager {
   }
 
   private void fetchJobSpecs() throws ExecutionException, InterruptedException {
-    List<Pair<SpecExecutorInstance.Verb, Spec>> changesSpecs =
-        (List<Pair<SpecExecutorInstance.Verb, Spec>>) this.specExecutorInstanceConsumer.changedSpecs().get();
+    List<Pair<SpecExecutor.Verb, Spec>> changesSpecs =
+        (List<Pair<SpecExecutor.Verb, Spec>>) this.specConsumer.changedSpecs().get();
 
     // propagate thread interruption so that caller will exit from loop
     if (Thread.interrupted()) {
       throw new InterruptedException();
     }
 
-    for (Pair<SpecExecutorInstance.Verb, Spec> entry : changesSpecs) {
-      SpecExecutorInstance.Verb verb = entry.getKey();
-      if (verb.equals(SpecExecutorInstance.Verb.ADD)) {
+    for (Pair<SpecExecutor.Verb, Spec> entry : changesSpecs) {
+      SpecExecutor.Verb verb = entry.getKey();
+      if (verb.equals(SpecExecutor.Verb.ADD)) {
         // Handle addition
         JobSpec jobSpec = (JobSpec) entry.getValue();
         postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
-      } else if (verb.equals(SpecExecutorInstanceConsumer.Verb.UPDATE)) {
+      } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
         // Handle update
         JobSpec jobSpec = (JobSpec) entry.getValue();
         postUpdateJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
-      } else if (verb.equals(SpecExecutorInstanceConsumer.Verb.DELETE)) {
+      } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
         // Handle delete
         Spec anonymousSpec = (Spec) entry.getValue();
         postDeleteJobConfigArrival(anonymousSpec.getUri().toString(), new Properties());
@@ -148,11 +147,11 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager {
 
   @Override
   protected void shutDown() throws Exception {
-    if (this.specExecutorInstanceConsumer instanceof Service) {
-      ((Service) this.specExecutorInstanceConsumer).stopAsync().awaitTerminated(this.stopTimeoutSeconds,
+    if (this.specConsumer instanceof Service) {
+      ((Service) this.specConsumer).stopAsync().awaitTerminated(this.stopTimeoutSeconds,
           TimeUnit.SECONDS);
     }
 
     ExecutorsUtils.shutdownExecutorService(this.fetchJobSpecExecutor, Optional.of(LOGGER));
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
new file mode 100644
index 0000000..2e06817
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Future;
+
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.util.CompletedFuture;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+public class AzkabanSpecExecutor extends AbstractSpecExecutor {
+
+  // Executor Instance
+  protected final Config _config;
+
+  private SpecProducer<Spec> azkabanSpecProducer;
+
+  public AzkabanSpecExecutor(Config config, Optional<Logger> log) {
+    super(config, log);
+    Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
+    _config = config.withFallback(defaultConfig);
+    azkabanSpecProducer = new AzkabanSpecProducer(_config, log);
+  }
+
+  @Override
+  public Future<String> getDescription() {
+    return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+  }
+
+
+  @Override
+  public Future<? extends SpecProducer> getProducer() {
+    return new CompletedFuture<>(this.azkabanSpecProducer, null);
+  }
+
+  @Override
+  public Future<Config> getConfig() {
+    return new CompletedFuture<>(_config, null);
+  }
+
+  @Override
+  public Future<String> getHealth() {
+    return new CompletedFuture<>("Healthy", null);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    // nothing to do in default implementation
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // nothing to do in default implementation
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
deleted file mode 100644
index dcc89cc..0000000
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.service.modules.orchestration;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-
-public class AzkabanSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance {
-  protected static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
-  protected static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
-
-  // Executor Instance
-  protected final Config _config;
-  protected final Logger _log;
-  protected final URI _specExecutorInstanceUri;
-  protected final Map<String, String> _capabilities;
-
-  public AzkabanSpecExecutorInstance(Config config, Optional<Logger> log) {
-    Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
-    _config = config.withFallback(defaultConfig);
-    _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
-    try {
-      _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
-          "NA"));
-    } catch (URISyntaxException e) {
-      throw new RuntimeException(e);
-    }
-    _capabilities = Maps.newHashMap();
-    if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) {
-      String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY);
-      List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr);
-      for (String capability : capabilities) {
-        List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability);
-        Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported "
-            + "per capability, found: " + currentCapability);
-        _capabilities.put(currentCapability.get(0), currentCapability.get(1));
-      }
-    }
-  }
-
-  @Override
-  public URI getUri() {
-    return _specExecutorInstanceUri;
-  }
-
-  @Override
-  public Future<String> getDescription() {
-    return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + _specExecutorInstanceUri, null);
-  }
-
-  @Override
-  public Future<Config> getConfig() {
-    return new CompletedFuture<>(_config, null);
-  }
-
-  @Override
-  public Future<String> getHealth() {
-    return new CompletedFuture<>("Healthy", null);
-  }
-
-  @Override
-  public Future<? extends Map<String, String>> getCapabilities() {
-    return new CompletedFuture<>(_capabilities, null);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    // nothing to do in default implementation
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    // nothing to do in default implementation
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
deleted file mode 100644
index 47df250..0000000
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.service.modules.orchestration;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.Future;
-
-import org.apache.commons.codec.EncoderException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-import org.slf4j.Logger;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-
-
-public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInstance
-    implements SpecExecutorInstanceProducer<Spec>, Closeable {
-
-  // Session Id for GaaS User
-  private String _sessionId;
-
-
-  public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
-    super(config, log);
-
-    try {
-      // Initialize Azkaban client / producer and cache credentials
-      String azkabanUsername = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
-      String azkabanPassword = getAzkabanPassword(_config);
-      String azkabanServerUrl = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
-
-      _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
-    } catch (IOException | EncoderException e) {
-      throw new RuntimeException("Could not authenticate with Azkaban", e);
-    }
-  }
-
-  private String getAzkabanPassword(Config config) {
-    if (StringUtils.isNotBlank(System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY))) {
-      return System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY);
-    }
-
-    return ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY, StringUtils.EMPTY);
-  }
-
-  public AzkabanSpecExecutorInstanceProducer(Config config, Logger log) {
-    this(config, Optional.of(log));
-  }
-
-  /** Constructor with no logging */
-  public AzkabanSpecExecutorInstanceProducer(Config config) {
-    this(config, Optional.<Logger>absent());
-  }
-
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public Future<?> addSpec(Spec addedSpec) {
-    // If project already exists, execute it
-    try {
-      AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
-      boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig);
-
-      // If project does not already exists, create and execute it
-      if (azkabanProjectExists) {
-        _log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
-        AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig),
-            azkabanProjectConfig);
-      } else {
-        _log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
-
-        // Deleted project also returns true if-project-exists check, so optimistically first create the project
-        // .. (it will create project if it was never created or deleted), if project exists it will fail with
-        // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
-        // .. specified
-        try {
-          createNewAzkabanProject(_sessionId, azkabanProjectConfig);
-        } catch (IOException e) {
-          if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
-            if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
-                ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
-              _log.info("Project already exists for this Spec, but force overwrite specified");
-              updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
-            } else {
-              _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
-                  azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
-            }
-          } else {
-            throw e;
-          }
-        }
-      }
-
-
-    } catch (IOException e) {
-      throw new RuntimeException("Issue in setting up Azkaban project.", e);
-    }
-
-    return new CompletedFuture<>(_config, null);
-  }
-
-  @Override
-  public Future<?> updateSpec(Spec updatedSpec) {
-    // Re-create project
-    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
-
-    try {
-      updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
-    } catch (IOException e) {
-      throw new RuntimeException("Issue in setting up Azkaban project.", e);
-    }
-
-    return new CompletedFuture<>(_config, null);
-  }
-
-  @Override
-  public Future<?> deleteSpec(URI deletedSpecURI) {
-    // Delete project
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Future<? extends List<Spec>> listSpecs() {
-    throw new UnsupportedOperationException();
-  }
-
-  private void createNewAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
-    // Create Azkaban Job
-    String azkabanProjectId = AzkabanJobHelper.createAzkabanJob(sessionId, azkabanProjectConfig);
-
-    // Schedule Azkaban Job
-    AzkabanJobHelper.scheduleJob(sessionId, azkabanProjectId, azkabanProjectConfig);
-
-    _log.info(String.format("Azkaban project created: %smanager?project=%s",
-        azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
-  }
-
-  private void updateExistingAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
-    _log.info(String.format("Updating project: %smanager?project=%s", azkabanProjectConfig.getAzkabanServerUrl(),
-        azkabanProjectConfig.getAzkabanProjectName()));
-
-    // Get project Id
-    String azkabanProjectId = AzkabanJobHelper.getProjectId(sessionId, azkabanProjectConfig);
-
-    // Replace Azkaban Job
-    AzkabanJobHelper.replaceAzkabanJob(sessionId, azkabanProjectId, azkabanProjectConfig);
-
-    // Change schedule
-    AzkabanJobHelper.changeJobSchedule(sessionId, azkabanProjectId, azkabanProjectConfig);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
new file mode 100644
index 0000000..5a491ab
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.commons.codec.EncoderException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class AzkabanSpecProducer implements SpecProducer<Spec>, Closeable {
+
+  // Session Id for GaaS User
+  private String _sessionId;
+  private Config _config;
+
+  public AzkabanSpecProducer(Config config, Optional<Logger> log) {
+    this._config = config;
+    try {
+      // Initialize Azkaban client / producer and cache credentials
+      String azkabanUsername = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
+      String azkabanPassword = getAzkabanPassword(_config);
+      String azkabanServerUrl = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+
+      _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
+    } catch (IOException | EncoderException e) {
+      throw new RuntimeException("Could not authenticate with Azkaban", e);
+    }
+  }
+
+  private String getAzkabanPassword(Config config) {
+    if (StringUtils.isNotBlank(System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY))) {
+      return System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY);
+    }
+
+    return ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY, StringUtils.EMPTY);
+  }
+
+  public AzkabanSpecProducer(Config config, Logger log) {
+    this(config, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public AzkabanSpecProducer(Config config) {
+    this(config, Optional.<Logger>absent());
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public Future<?> addSpec(Spec addedSpec) {
+    // If project already exists, execute it
+    try {
+      AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
+      boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig);
+
+      // If project does not already exists, create and execute it
+      if (azkabanProjectExists) {
+        log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+        AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig),
+            azkabanProjectConfig);
+      } else {
+        log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName());
+
+        // Deleted project also returns true if-project-exists check, so optimistically first create the project
+        // .. (it will create project if it was never created or deleted), if project exists it will fail with
+        // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
+        // .. specified
+        try {
+          createNewAzkabanProject(_sessionId, azkabanProjectConfig);
+        } catch (IOException e) {
+          if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
+            if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
+                ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
+              log.info("Project already exists for this Spec, but force overwrite specified");
+              updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
+            } else {
+              log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
+                  azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+            }
+          } else {
+            throw e;
+          }
+        }
+      }
+
+
+    } catch (IOException e) {
+      throw new RuntimeException("Issue in setting up Azkaban project.", e);
+    }
+
+    return new CompletedFuture<>(_config, null);
+  }
+
+  @Override
+  public Future<?> updateSpec(Spec updatedSpec) {
+    // Re-create project
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
+
+    try {
+      updateExistingAzkabanProject(_sessionId, azkabanProjectConfig);
+    } catch (IOException e) {
+      throw new RuntimeException("Issue in setting up Azkaban project.", e);
+    }
+
+    return new CompletedFuture<>(_config, null);
+  }
+
+  @Override
+  public Future<?> deleteSpec(URI deletedSpecURI) {
+    // Delete project
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Future<? extends List<Spec>> listSpecs() {
+    throw new UnsupportedOperationException();
+  }
+
+  private void createNewAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    // Create Azkaban Job
+    String azkabanProjectId = AzkabanJobHelper.createAzkabanJob(sessionId, azkabanProjectConfig);
+
+    // Schedule Azkaban Job
+    AzkabanJobHelper.scheduleJob(sessionId, azkabanProjectId, azkabanProjectConfig);
+
+    log.info(String.format("Azkaban project created: %smanager?project=%s",
+        azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+  }
+
+  private void updateExistingAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    log.info(String.format("Updating project: %smanager?project=%s", azkabanProjectConfig.getAzkabanServerUrl(),
+        azkabanProjectConfig.getAzkabanProjectName()));
+
+    // Get project Id
+    String azkabanProjectId = AzkabanJobHelper.getProjectId(sessionId, azkabanProjectConfig);
+
+    // Replace Azkaban Job
+    AzkabanJobHelper.replaceAzkabanJob(sessionId, azkabanProjectId, azkabanProjectConfig);
+
+    // Change schedule
+    AzkabanJobHelper.changeJobSchedule(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
new file mode 100644
index 0000000..083ccf3
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.Kafka08ConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.CompletedFuture;
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class SimpleKafkaSpecConsumer implements SpecConsumer<Spec>, Closeable {
+
+  // Consumer
+  protected final GobblinKafkaConsumerClient _kafka08Consumer;
+  protected final List<KafkaPartition> _partitions;
+  protected final List<Long> _lowWatermark;
+  protected final List<Long> _nextWatermark;
+  protected final List<Long> _highWatermark;
+
+  private Iterator<KafkaConsumerRecord> messageIterator = null;
+  private int currentPartitionIdx = -1;
+  private boolean isFirstRun = true;
+
+  private final BinaryDecoder _decoder;
+  private final SpecificDatumReader<AvroJobSpec> _reader;
+  private final SchemaVersionWriter<?> _versionWriter;
+
+  public SimpleKafkaSpecConsumer(Config config, Optional<Logger> log) {
+
+    // Consumer
+    _kafka08Consumer = new Kafka08ConsumerClient.Factory().create(config);
+    List<KafkaTopic> kafkaTopics = _kafka08Consumer.getFilteredTopics(Collections.EMPTY_LIST,
+        Lists.newArrayList(Pattern.compile(config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY))));
+    _partitions = kafkaTopics.get(0).getPartitions();
+    _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+    _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+    _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+
+    InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
+    _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
+    _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
+    _versionWriter = new FixedSchemaVersionWriter();
+  }
+
+  public SimpleKafkaSpecConsumer(Config config, Logger log) {
+    this(config, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public SimpleKafkaSpecConsumer(Config config) {
+    this(config, Optional.<Logger>absent());
+  }
+
+  @Override
+  public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
+    List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>();
+    initializeWatermarks();
+    this.currentPartitionIdx = -1;
+    while (!allPartitionsFinished()) {
+      if (currentPartitionFinished()) {
+        moveToNextPartition();
+        continue;
+      }
+      if (this.messageIterator == null || !this.messageIterator.hasNext()) {
+        try {
+          this.messageIterator = fetchNextMessageBuffer();
+        } catch (Exception e) {
+          log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
+              getCurrentPartition()), e);
+          moveToNextPartition();
+          continue;
+        }
+        if (this.messageIterator == null || !this.messageIterator.hasNext()) {
+          moveToNextPartition();
+          continue;
+        }
+      }
+      while (!currentPartitionFinished()) {
+        if (!this.messageIterator.hasNext()) {
+          break;
+        }
+
+        KafkaConsumerRecord nextValidMessage = this.messageIterator.next();
+
+        // Even though we ask Kafka to give us a message buffer starting from offset x, it may
+        // return a buffer that starts from offset smaller than x, so we need to skip messages
+        // until we get to x.
+        if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) {
+          continue;
+        }
+
+        _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
+        try {
+          final AvroJobSpec record;
+
+          if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
+            record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
+          } else if (nextValidMessage instanceof DecodeableKafkaRecord){
+            record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue();
+          } else {
+            throw new IllegalStateException(
+                "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
+                    + " or DecodeableKafkaRecord");
+          }
+
+          JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
+
+          Properties props = new Properties();
+          props.putAll(record.getProperties());
+          jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion())
+              .withDescription(record.getDescription()).withConfigAsProperties(props);
+
+          if (!record.getTemplateUri().isEmpty()) {
+            jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
+          }
+
+          String verbName = record.getMetadata().get(VERB_KEY);
+          Verb verb = Verb.valueOf(verbName);
+
+          changesSpecs.add(new ImmutablePair<Verb, Spec>(verb, jobSpecBuilder.build()));
+        } catch (Throwable t) {
+          log.error("Could not decode record at partition " + this.currentPartitionIdx +
+              " offset " + nextValidMessage.getOffset());
+        }
+      }
+    }
+
+    return new CompletedFuture(changesSpecs, null);
+  }
+
+  private void initializeWatermarks() {
+    initializeLowWatermarks();
+    initializeHighWatermarks();
+  }
+
+  private void initializeLowWatermarks() {
+    try {
+      int i=0;
+      for (KafkaPartition kafkaPartition : _partitions) {
+        if (isFirstRun) {
+          long earliestOffset = _kafka08Consumer.getEarliestOffset(kafkaPartition);
+          _lowWatermark.set(i, earliestOffset);
+        } else {
+          _lowWatermark.set(i, _highWatermark.get(i));
+        }
+        i++;
+      }
+      isFirstRun = false;
+    } catch (KafkaOffsetRetrievalFailureException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void initializeHighWatermarks() {
+    try {
+      int i=0;
+      for (KafkaPartition kafkaPartition : _partitions) {
+        long latestOffset = _kafka08Consumer.getLatestOffset(kafkaPartition);
+        _highWatermark.set(i, latestOffset);
+        i++;
+      }
+    } catch (KafkaOffsetRetrievalFailureException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private boolean allPartitionsFinished() {
+    return this.currentPartitionIdx >= _nextWatermark.size();
+  }
+
+  private boolean currentPartitionFinished() {
+    if (this.currentPartitionIdx == -1) {
+      return true;
+    } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private int moveToNextPartition() {
+    this.messageIterator = null;
+    return this.currentPartitionIdx ++;
+  }
+
+  private KafkaPartition getCurrentPartition() {
+    return _partitions.get(this.currentPartitionIdx);
+  }
+
+  private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() {
+    return _kafka08Consumer.consume(_partitions.get(this.currentPartitionIdx),
+        _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx));
+  }
+
+  private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
+    InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes());
+    _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+
+    Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
+
+    return _reader.read(null, decoder);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _kafka08Consumer.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
new file mode 100644
index 0000000..8545bf6
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+
+/**
+ * An {@link SpecExecutor} that use Kafka as the communication mechanism.
+ */
+public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
+  public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics";
+
+
+  protected static final String VERB_KEY = "Verb";
+
+  private SpecProducer<Spec> specProducer;
+
+  public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) {
+    super(config, log);
+    specProducer = new SimpleKafkaSpecProducer(config, log);
+  }
+
+  /**
+   * Constructor with no logging, necessary for simple use case.
+   * @param config
+   */
+  public SimpleKafkaSpecExecutor(Config config) {
+    this(config, Optional.absent());
+  }
+
+  @Override
+  public Future<? extends SpecProducer> getProducer() {
+    return new CompletedFuture<>(this.specProducer, null);
+  }
+
+  @Override
+  public Future<String> getDescription() {
+    return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    optionalCloser = Optional.of(Closer.create());
+    specProducer = optionalCloser.get().register((SimpleKafkaSpecProducer) specProducer);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    if (optionalCloser.isPresent()) {
+      optionalCloser.get().close();
+    } else {
+      log.warn("There's no Closer existed in " + this.getClass().getName());
+    }
+  }
+
+  public static class SpecExecutorInstanceDataPacket implements Serializable {
+
+    protected Verb _verb;
+    protected URI _uri;
+    protected Spec _spec;
+
+    public SpecExecutorInstanceDataPacket(Verb verb, URI uri, Spec spec) {
+      _verb = verb;
+      _uri = uri;
+      _spec = spec;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec);
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message