gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [3/4] incubator-gobblin git commit: [GOBBLIN-3] Multi-hop flow compiler implementation
Date Tue, 12 Sep 2017 09:30:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstance.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstance.java
deleted file mode 100644
index a60d8e2..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstance.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import com.google.common.util.concurrent.AbstractIdleService;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-
-
-public class SimpleKafkaSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance {
-  public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics";
-  protected static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
-  protected static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
-
-  // Executor Instance
-  protected final Config _config;
-  protected final Logger _log;
-  protected final URI _specExecutorInstanceUri;
-  protected final Map<String, String> _capabilities;
-
-  protected static final String VERB_KEY = "Verb";
-
-  public SimpleKafkaSpecExecutorInstance(Config config, Optional<Logger> log) {
-    _config = config;
-    _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
-    try {
-      _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
-          "NA"));
-    } catch (URISyntaxException e) {
-      throw new RuntimeException(e);
-    }
-    _capabilities = Maps.newHashMap();
-    if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) {
-      String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY);
-      List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr);
-      for (String capability : capabilities) {
-        List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability);
-        Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported "
-            + "per capability, found: " + currentCapability);
-        _capabilities.put(currentCapability.get(0), currentCapability.get(1));
-      }
-    }
-  }
-
-  @Override
-  public URI getUri() {
-    return _specExecutorInstanceUri;
-  }
-
-  @Override
-  public Future<String> getDescription() {
-    return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + _specExecutorInstanceUri, null);
-  }
-
-  @Override
-  public Future<Config> getConfig() {
-    return new CompletedFuture<>(_config, null);
-  }
-
-  @Override
-  public Future<String> getHealth() {
-    return new CompletedFuture<>("Healthy", null);
-  }
-
-  @Override
-  public Future<? extends Map<String, String>> getCapabilities() {
-    return new CompletedFuture<>(_capabilities, null);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    // nothing to do in default implementation
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    // nothing to do in default implementation
-  }
-
-  public static class SpecExecutorInstanceDataPacket implements Serializable {
-
-    protected Verb _verb;
-    protected URI _uri;
-    protected Spec _spec;
-
-    public SpecExecutorInstanceDataPacket(Verb verb, URI uri, Spec spec) {
-      _verb = verb;
-      _uri = uri;
-      _spec = spec;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.java
deleted file mode 100644
index 90960e7..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.regex.Pattern;
-
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
-import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
-import org.apache.gobblin.kafka.client.Kafka08ConsumerClient;
-import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceConsumer;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
-import org.apache.gobblin.util.CompletedFuture;
-
-public class SimpleKafkaSpecExecutorInstanceConsumer extends SimpleKafkaSpecExecutorInstance
-    implements SpecExecutorInstanceConsumer<Spec>, Closeable {
-
-  // Consumer
-  protected final GobblinKafkaConsumerClient _kafka08Consumer;
-  protected final List<KafkaPartition> _partitions;
-  protected final List<Long> _lowWatermark;
-  protected final List<Long> _nextWatermark;
-  protected final List<Long> _highWatermark;
-
-  private Iterator<KafkaConsumerRecord> messageIterator = null;
-  private int currentPartitionIdx = -1;
-  private boolean isFirstRun = true;
-
-  private final BinaryDecoder _decoder;
-  private final SpecificDatumReader<AvroJobSpec> _reader;
-  private final SchemaVersionWriter<?> _versionWriter;
-
-  public SimpleKafkaSpecExecutorInstanceConsumer(Config config, Optional<Logger> log) {
-    super(config, log);
-
-    // Consumer
-    _kafka08Consumer = new Kafka08ConsumerClient.Factory().create(config);
-    List<KafkaTopic> kafkaTopics = _kafka08Consumer.getFilteredTopics(Collections.EMPTY_LIST,
-        Lists.newArrayList(Pattern.compile(config.getString(SPEC_KAFKA_TOPICS_KEY))));
-    _partitions = kafkaTopics.get(0).getPartitions();
-    _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
-    _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
-    _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
-
-    InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
-    _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
-    _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
-    _versionWriter = new FixedSchemaVersionWriter();
-  }
-
-  public SimpleKafkaSpecExecutorInstanceConsumer(Config config, Logger log) {
-    this(config, Optional.of(log));
-  }
-
-  /** Constructor with no logging */
-  public SimpleKafkaSpecExecutorInstanceConsumer(Config config) {
-    this(config, Optional.<Logger>absent());
-  }
-
-  @Override
-  public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() {
-    List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>();
-    initializeWatermarks();
-    this.currentPartitionIdx = -1;
-    while (!allPartitionsFinished()) {
-      if (currentPartitionFinished()) {
-        moveToNextPartition();
-        continue;
-      }
-      if (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        try {
-          this.messageIterator = fetchNextMessageBuffer();
-        } catch (Exception e) {
-          _log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
-              getCurrentPartition()), e);
-          moveToNextPartition();
-          continue;
-        }
-        if (this.messageIterator == null || !this.messageIterator.hasNext()) {
-          moveToNextPartition();
-          continue;
-        }
-      }
-      while (!currentPartitionFinished()) {
-        if (!this.messageIterator.hasNext()) {
-          break;
-        }
-
-        KafkaConsumerRecord nextValidMessage = this.messageIterator.next();
-
-        // Even though we ask Kafka to give us a message buffer starting from offset x, it may
-        // return a buffer that starts from offset smaller than x, so we need to skip messages
-        // until we get to x.
-        if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) {
-          continue;
-        }
-
-        _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
-        try {
-          final AvroJobSpec record;
-
-          if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
-            record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
-          } else if (nextValidMessage instanceof DecodeableKafkaRecord){
-            record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue();
-          } else {
-            throw new IllegalStateException(
-                "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
-                    + " or DecodeableKafkaRecord");
-          }
-
-          JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
-
-          Properties props = new Properties();
-          props.putAll(record.getProperties());
-          jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion())
-              .withDescription(record.getDescription()).withConfigAsProperties(props);
-
-          if (!record.getTemplateUri().isEmpty()) {
-            jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
-          }
-
-          String verbName = record.getMetadata().get(VERB_KEY);
-          Verb verb = Verb.valueOf(verbName);
-
-          changesSpecs.add(new ImmutablePair<Verb, Spec>(verb, jobSpecBuilder.build()));
-        } catch (Throwable t) {
-          _log.error("Could not decode record at partition " + this.currentPartitionIdx +
-              " offset " + nextValidMessage.getOffset());
-        }
-      }
-    }
-
-    return new CompletedFuture(changesSpecs, null);
-  }
-
-  private void initializeWatermarks() {
-    initializeLowWatermarks();
-    initializeHighWatermarks();
-  }
-
-  private void initializeLowWatermarks() {
-    try {
-      int i=0;
-      for (KafkaPartition kafkaPartition : _partitions) {
-        if (isFirstRun) {
-          long earliestOffset = _kafka08Consumer.getEarliestOffset(kafkaPartition);
-          _lowWatermark.set(i, earliestOffset);
-        } else {
-          _lowWatermark.set(i, _highWatermark.get(i));
-        }
-        i++;
-      }
-      isFirstRun = false;
-    } catch (KafkaOffsetRetrievalFailureException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void initializeHighWatermarks() {
-    try {
-      int i=0;
-      for (KafkaPartition kafkaPartition : _partitions) {
-        long latestOffset = _kafka08Consumer.getLatestOffset(kafkaPartition);
-        _highWatermark.set(i, latestOffset);
-        i++;
-      }
-    } catch (KafkaOffsetRetrievalFailureException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private boolean allPartitionsFinished() {
-    return this.currentPartitionIdx >= _nextWatermark.size();
-  }
-
-  private boolean currentPartitionFinished() {
-    if (this.currentPartitionIdx == -1) {
-      return true;
-    } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private int moveToNextPartition() {
-    this.messageIterator = null;
-    return this.currentPartitionIdx ++;
-  }
-
-  private KafkaPartition getCurrentPartition() {
-    return _partitions.get(this.currentPartitionIdx);
-  }
-
-  private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() {
-    return _kafka08Consumer.consume(_partitions.get(this.currentPartitionIdx),
-        _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx));
-  }
-
-  private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
-    InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes());
-    _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
-
-    Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
-
-    return _reader.read(null, decoder);
-  }
-
-  @Override
-  public void close() throws IOException {
-    _kafka08Consumer.close();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceProducer.java
deleted file mode 100644
index cdafe06..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceProducer.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.Future;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-import org.apache.avro.mapred.AvroJob;
-import org.slf4j.Logger;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.kafka.writer.Kafka08DataWriter;
-import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.WriteCallback;
-
-
-@NotThreadSafe
-public class SimpleKafkaSpecExecutorInstanceProducer extends SimpleKafkaSpecExecutorInstance
-    implements SpecExecutorInstanceProducer<Spec>, Closeable  {
-
-  // Producer
-  protected Kafka08DataWriter<byte[]> _kafka08Producer;
-  private final AvroSerializer<AvroJobSpec> _serializer;
-
-  public SimpleKafkaSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
-    super(config, log);
-
-    try {
-      _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter());
-    } catch (IOException e) {
-      throw new RuntimeException("Could not create AvroBinarySerializer", e);
-    }
-  }
-
-  public SimpleKafkaSpecExecutorInstanceProducer(Config config, Logger log) {
-    this(config, Optional.of(log));
-  }
-
-  /** Constructor with no logging */
-  public SimpleKafkaSpecExecutorInstanceProducer(Config config) {
-    this(config, Optional.<Logger>absent());
-  }
-
-  @Override
-  public Future<?> addSpec(Spec addedSpec) {
-    AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, Verb.ADD);
-
-    _log.info("Adding Spec: " + addedSpec + " using Kafka.");
-
-    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
-  }
-
-  @Override
-  public Future<?> updateSpec(Spec updatedSpec) {
-    AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, Verb.UPDATE);
-
-    _log.info("Updating Spec: " + updatedSpec + " using Kafka.");
-
-    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
-  }
-
-  @Override
-  public Future<?> deleteSpec(URI deletedSpecURI) {
-
-    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
-        .setMetadata(ImmutableMap.of(VERB_KEY, Verb.DELETE.name())).build();
-
-    _log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
-
-    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
-  }
-
-  @Override
-  public Future<? extends List<Spec>> listSpecs() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void close() throws IOException {
-     _kafka08Producer.close();
-  }
-
-  private Kafka08DataWriter<byte[]> getKafka08Producer() {
-    if (null == _kafka08Producer) {
-      _kafka08Producer = new Kafka08DataWriter<byte[]>(ConfigUtils.configToProperties(_config));
-    }
-    return _kafka08Producer;
-  }
-
-  private AvroJobSpec convertToAvroJobSpec(Spec spec, Verb verb) {
-    if (spec instanceof JobSpec) {
-      JobSpec jobSpec = (JobSpec) spec;
-      AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder();
-
-      avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
-          .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
-          .setMetadata(ImmutableMap.of(VERB_KEY, verb.name()));
-
-      if (jobSpec.getTemplateURI().isPresent()) {
-        avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
-      }
-
-      return avroJobSpecBuilder.build();
-    } else {
-      throw new RuntimeException("Unsupported spec type " + spec.getClass());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
new file mode 100644
index 0000000..13aae6f
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Future;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.slf4j.Logger;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.kafka.writer.Kafka08DataWriter;
+import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.WriteCallback;
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@NotThreadSafe
+public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
+
+  // Producer
+  protected Kafka08DataWriter<byte[]> _kafka08Producer;
+  private final AvroSerializer<AvroJobSpec> _serializer;
+  private Config _config;
+
+  public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) {
+
+    try {
+      _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter());
+      _config = config;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create AvroBinarySerializer", e);
+    }
+  }
+
+  public SimpleKafkaSpecProducer(Config config, Logger log) {
+    this(config, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public SimpleKafkaSpecProducer(Config config) {
+    this(config, Optional.<Logger>absent());
+  }
+
+  @Override
+  public Future<?> addSpec(Spec addedSpec) {
+    AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
+
+    log.info("Adding Spec: " + addedSpec + " using Kafka.");
+
+    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+  }
+
+  @Override
+  public Future<?> updateSpec(Spec updatedSpec) {
+    AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
+
+    log.info("Updating Spec: " + updatedSpec + " using Kafka.");
+
+    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+  }
+
+  @Override
+  public Future<?> deleteSpec(URI deletedSpecURI) {
+
+    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
+        .setMetadata(ImmutableMap.of(VERB_KEY, Verb.DELETE.name())).build();
+
+    log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
+
+    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+  }
+
+  @Override
+  public Future<? extends List<Spec>> listSpecs() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IOException {
+    _kafka08Producer.close();
+  }
+
+  private Kafka08DataWriter<byte[]> getKafka08Producer() {
+    if (null == _kafka08Producer) {
+      _kafka08Producer = new Kafka08DataWriter<byte[]>(ConfigUtils.configToProperties(_config));
+    }
+    return _kafka08Producer;
+  }
+
+  private AvroJobSpec convertToAvroJobSpec(Spec spec, Verb verb) {
+    if (spec instanceof JobSpec) {
+      JobSpec jobSpec = (JobSpec) spec;
+      AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder();
+
+      avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
+          .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
+          .setMetadata(ImmutableMap.of(VERB_KEY, verb.name()));
+
+      if (jobSpec.getTemplateURI().isPresent()) {
+        avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
+      }
+
+      return avroJobSpecBuilder.build();
+    } else {
+      throw new RuntimeException("Unsupported spec type " + spec.getClass());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
new file mode 100644
index 0000000..fd42211
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
+import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+/**
+ * SpecConsumer that consumes from kafka in a streaming manner
+ * Implemented {@link AbstractIdleService} for starting up and shutting down.
+ */
+public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable {
+  public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize";
+  private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
+  private final AvroJobSpecKafkaJobMonitor _jobMonitor;
+  private final BlockingQueue<ImmutablePair<Verb, Spec>> _jobSpecQueue;
+
+  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) {
+    String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
+    Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic,
+        KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST));
+
+    try {
+      _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory())
+          .forConfig(config.withFallback(defaults), jobCatalog);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create job monitor", e);
+    }
+
+    _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
+        DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
+
+    // listener will add job specs to a blocking queue to send to callers of changedSpecs()
+    jobCatalog.addListener(new JobSpecListener());
+  }
+
+  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) {
+    this(config, jobCatalog, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog) {
+    this(config, jobCatalog, Optional.<Logger>absent());
+  }
+
+  /**
+   * This method returns job specs receive from Kafka. It will block if there are no job specs.
+   * @return list of (verb, jobspecs) pairs.
+   */
+  @Override
+  public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() {
+    List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>();
+
+    try {
+      Pair<Verb, Spec> specPair = _jobSpecQueue.take();
+
+      do {
+        changesSpecs.add(specPair);
+
+        // if there are more elements then pass them along in this call
+        specPair = _jobSpecQueue.poll();
+      } while (specPair != null);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    return new CompletedFuture(changesSpecs, null);
+  }
+
+  @Override
+  protected void startUp() {
+    _jobMonitor.startAsync().awaitRunning();
+  }
+
+  @Override
+  protected void shutDown() {
+    _jobMonitor.stopAsync().awaitTerminated();
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutDown();
+  }
+
+  /**
+   * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of
+   * {@link StreamingKafkaSpecConsumer}
+   */
+  protected class JobSpecListener extends DefaultJobCatalogListenerImpl {
+    public JobSpecListener() {
+      super(StreamingKafkaSpecConsumer.this.log);
+    }
+
+    @Override public void onAddJob(JobSpec addedJob) {
+      super.onAddJob(addedJob);
+
+      try {
+        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.ADD, addedJob));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
+      super.onDeleteJob(deletedJobURI, deletedJobVersion);
+      try {
+        JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI);
+
+        Properties props = new Properties();
+        jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props);
+
+        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.DELETE, jobSpecBuilder.build()));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override public void onUpdateJob(JobSpec updatedJob) {
+      super.onUpdateJob(updatedJob);
+
+      try {
+        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.UPDATE, updatedJob));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.java
deleted file mode 100644
index ac7fe03..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import org.apache.gobblin.util.ConfigUtils;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstanceConsumer;
-import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor;
-import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
-import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
-import org.apache.gobblin.util.CompletedFuture;
-
-
-/**
- * SpecExecutorInstanceConsumer that consumes from kafka in a streaming manner
- */
-public class StreamingKafkaSpecExecutorInstanceConsumer extends SimpleKafkaSpecExecutorInstance
-    implements SpecExecutorInstanceConsumer<Spec>, Closeable {
-  public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize";
-  private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
-  private final AvroJobSpecKafkaJobMonitor _jobMonitor;
-  private final BlockingQueue<ImmutablePair<Verb, Spec>> _jobSpecQueue;
-
-  public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) {
-    super(config, log);
-    String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
-    Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic,
-        KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST));
-
-    try {
-      _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory())
-          .forConfig(config.withFallback(defaults), jobCatalog);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not create job monitor", e);
-    }
-
-    _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
-        DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
-
-    // listener will add job specs to a blocking queue to send to callers of changedSpecs()
-    jobCatalog.addListener(new JobSpecListener());
-  }
-
-  public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) {
-    this(config, jobCatalog, Optional.of(log));
-  }
-
-  /** Constructor with no logging */
-  public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog jobCatalog) {
-    this(config, jobCatalog, Optional.<Logger>absent());
-  }
-
-  /**
-   * This method returns job specs receive from Kafka. It will block if there are no job specs.
-   * @return list of (verb, jobspecs) pairs.
-   */
-  @Override
-  public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() {
-    List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>();
-
-    try {
-      Pair<Verb, Spec> specPair = _jobSpecQueue.take();
-
-      do {
-        changesSpecs.add(specPair);
-
-        // if there are more elements then pass them along in this call
-        specPair = _jobSpecQueue.poll();
-      } while (specPair != null);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-
-    return new CompletedFuture(changesSpecs, null);
-  }
-
-  @Override
-  protected void startUp() {
-    _jobMonitor.startAsync().awaitRunning();
-  }
-
-  @Override
-  protected void shutDown() {
-    _jobMonitor.stopAsync().awaitTerminated();
-  }
-
-  @Override
-  public void close() throws IOException {
-    shutDown();
-  }
-
-  /**
-   * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of
-   * {@link StreamingKafkaSpecExecutorInstanceConsumer}
-   */
-  protected class JobSpecListener extends DefaultJobCatalogListenerImpl {
-    public JobSpecListener() {
-      super(StreamingKafkaSpecExecutorInstanceConsumer.this._log);
-    }
-
-    @Override public void onAddJob(JobSpec addedJob) {
-      super.onAddJob(addedJob);
-
-      try {
-        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.ADD, addedJob));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
-      super.onDeleteJob(deletedJobURI, deletedJobVersion);
-      try {
-        JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI);
-
-        Properties props = new Properties();
-        jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props);
-
-        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.DELETE, jobSpecBuilder.build()));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override public void onUpdateJob(JobSpec updatedJob) {
-      super.onUpdateJob(updatedJob);
-
-      try {
-        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.UPDATE, updatedJob));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceTest.java
deleted file mode 100644
index 58c5d72..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.Test;
-
-import com.google.common.io.Closer;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
-import org.apache.gobblin.metrics.reporter.KafkaTestBase;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.WriteResponse;
-
-
-@Slf4j
-public class SimpleKafkaSpecExecutorInstanceTest extends KafkaTestBase {
-
-  public static final String TOPIC = SimpleKafkaSpecExecutorInstanceTest.class.getSimpleName();
-
-  private Closer _closer;
-  private Properties _properties;
-  private SimpleKafkaSpecExecutorInstanceProducer _seip;
-  private SimpleKafkaSpecExecutorInstanceConsumer _seic;
-  private String _kafkaBrokers;
-
-  public SimpleKafkaSpecExecutorInstanceTest()
-      throws InterruptedException, RuntimeException {
-    super(TOPIC);
-    _kafkaBrokers = "localhost:" + kafkaPort;
-    log.info("Going to use Kakfa broker: " + _kafkaBrokers);
-  }
-
-  @Test
-  public void testAddSpec() throws Exception {
-    _closer = Closer.create();
-    _properties = new Properties();
-
-    // Properties for Producer
-    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
-    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers);
-    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-
-    // Properties for Consumer
-    _properties.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
-    _properties.setProperty(SimpleKafkaSpecExecutorInstanceProducer.SPEC_KAFKA_TOPICS_KEY, TOPIC);
-
-    // SEI Producer
-    _seip = _closer.register(new SimpleKafkaSpecExecutorInstanceProducer(ConfigUtils.propertiesToConfig(_properties)));
-
-    String addedSpecUriString = "/foo/bar/addedSpec";
-    Spec spec = initJobSpec(addedSpecUriString);
-    WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
-    log.info("WriteResponse: " + writeResponse);
-
-    try {
-      Thread.sleep(1000);
-    } catch(InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    _seic = _closer.register(new SimpleKafkaSpecExecutorInstanceConsumer(ConfigUtils.propertiesToConfig(_properties)));
-
-    List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
-    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
-
-    Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
-    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.ADD), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match");
-    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
-  }
-
-  @Test (dependsOnMethods = "testAddSpec")
-  public void testUpdateSpec() throws Exception {
-    String updatedSpecUriString = "/foo/bar/updatedSpec";
-    Spec spec = initJobSpec(updatedSpecUriString);
-    WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get();
-    log.info("WriteResponse: " + writeResponse);
-
-    try {
-      Thread.sleep(1000);
-    } catch(InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
-    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
-
-    Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
-    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.UPDATE), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match");
-    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
-  }
-
-  @Test (dependsOnMethods = "testUpdateSpec")
-  public void testDeleteSpec() throws Exception {
-    String deletedSpecUriString = "/foo/bar/deletedSpec";
-    WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get();
-    log.info("WriteResponse: " + writeResponse);
-
-    try {
-      Thread.sleep(1000);
-    } catch(InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
-    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
-
-    Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
-    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.DELETE), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match");
-    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
-  }
-
-  @Test (dependsOnMethods = "testDeleteSpec")
-  public void testResetConsumption() throws Exception {
-    SimpleKafkaSpecExecutorInstanceConsumer seic = _closer
-        .register(new SimpleKafkaSpecExecutorInstanceConsumer(ConfigUtils.propertiesToConfig(_properties)));
-
-    List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = seic.changedSpecs().get();
-    Assert.assertTrue(consumedEvent.size() == 3, "Consumption was reset, we should see all events");
-  }
-
-  private JobSpec initJobSpec(String specUri) {
-    Properties properties = new Properties();
-    return JobSpec.builder(specUri)
-        .withConfig(ConfigUtils.propertiesToConfig(properties))
-        .withVersion("1")
-        .withDescription("Spec Description")
-        .build();
-  }
-
-  @AfterClass
-  public void after() {
-    try {
-      _closer.close();
-    } catch(Exception e) {
-      log.error("Failed to close SEIC and SEIP.", e);
-    }
-    try {
-      close();
-    } catch(Exception e) {
-      log.error("Failed to close Kafka server.", e);
-    }
-  }
-
-  @AfterSuite
-  public void afterSuite() {
-    closeServer();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorTest.java
new file mode 100644
index 0000000..5567c18
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.io.Closer;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
+import org.apache.gobblin.metrics.reporter.KafkaTestBase;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.WriteResponse;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SimpleKafkaSpecExecutorTest extends KafkaTestBase {
+
+  public static final String TOPIC = SimpleKafkaSpecExecutorTest.class.getSimpleName();
+
+  private Closer _closer;
+  private Properties _properties;
+  private SimpleKafkaSpecProducer _seip;
+  private SimpleKafkaSpecConsumer _seic;
+  private String _kafkaBrokers;
+
+  public SimpleKafkaSpecExecutorTest()
+      throws InterruptedException, RuntimeException {
+    super(TOPIC);
+    _kafkaBrokers = "localhost:" + kafkaPort;
+    log.info("Going to use Kakfa broker: " + _kafkaBrokers);
+  }
+
+  @Test
+  public void testAddSpec() throws Exception {
+    _closer = Closer.create();
+    _properties = new Properties();
+
+    // Properties for Producer
+    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
+    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers);
+    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+    // Properties for Consumer
+    _properties.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    _properties.setProperty(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY, TOPIC);
+
+    // SEI Producer
+    _seip = _closer.register(new SimpleKafkaSpecProducer(ConfigUtils.propertiesToConfig(_properties)));
+
+    String addedSpecUriString = "/foo/bar/addedSpec";
+    Spec spec = initJobSpec(addedSpecUriString);
+    WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
+    log.info("WriteResponse: " + writeResponse);
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    _seic = _closer.register(new SimpleKafkaSpecConsumer(ConfigUtils.propertiesToConfig(_properties)));
+
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+  @Test (dependsOnMethods = "testAddSpec")
+  public void testUpdateSpec() throws Exception {
+    String updatedSpecUriString = "/foo/bar/updatedSpec";
+    Spec spec = initJobSpec(updatedSpecUriString);
+    WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get();
+    log.info("WriteResponse: " + writeResponse);
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.UPDATE), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+  @Test (dependsOnMethods = "testUpdateSpec")
+  public void testDeleteSpec() throws Exception {
+    String deletedSpecUriString = "/foo/bar/deletedSpec";
+    WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get();
+    log.info("WriteResponse: " + writeResponse);
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+  @Test (dependsOnMethods = "testDeleteSpec")
+  public void testResetConsumption() throws Exception {
+    SimpleKafkaSpecConsumer seic = _closer
+        .register(new SimpleKafkaSpecConsumer(ConfigUtils.propertiesToConfig(_properties)));
+
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 3, "Consumption was reset, we should see all events");
+  }
+
+  private JobSpec initJobSpec(String specUri) {
+    Properties properties = new Properties();
+    return JobSpec.builder(specUri)
+        .withConfig(ConfigUtils.propertiesToConfig(properties))
+        .withVersion("1")
+        .withDescription("Spec Description")
+        .build();
+  }
+
+  @AfterClass
+  public void after() {
+    try {
+      _closer.close();
+    } catch(Exception e) {
+      log.error("Failed to close SEIC and SEIP.", e);
+    }
+    try {
+      close();
+    } catch(Exception e) {
+      log.error("Failed to close Kafka server.", e);
+    }
+  }
+
+  @AfterSuite
+  public void afterSuite() {
+    closeServer();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceTest.java
deleted file mode 100644
index 939aafa..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.Test;
-
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
-import org.apache.gobblin.metrics.reporter.KafkaTestBase;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance;
-import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.WriteResponse;
-import lombok.extern.slf4j.Slf4j;
-
-
-@Slf4j
-public class StreamingKafkaSpecExecutorInstanceTest extends KafkaTestBase {
-
-  public static final String TOPIC = StreamingKafkaSpecExecutorInstanceTest.class.getSimpleName();
-
-  private Closer _closer;
-  private Properties _properties;
-  private SimpleKafkaSpecExecutorInstanceProducer _seip;
-  private StreamingKafkaSpecExecutorInstanceConsumer _seic;
-  private NonObservingFSJobCatalog _jobCatalog;
-  private String _kafkaBrokers;
-  private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorInstanceTest";
-  private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
-
-  public StreamingKafkaSpecExecutorInstanceTest()
-      throws InterruptedException, RuntimeException {
-    super(TOPIC);
-    _kafkaBrokers = "localhost:" + kafkaPort;
-    log.info("Going to use Kakfa broker: " + _kafkaBrokers);
-
-    cleanupTestDir();
-  }
-
-  private void cleanupTestDir() {
-    File testDir = new File(_TEST_DIR_PATH);
-
-    if (testDir.exists()) {
-      try {
-        FileUtils.deleteDirectory(testDir);
-      } catch (IOException e) {
-        throw new RuntimeException("Could not delete test directory", e);
-      }
-    }
-  }
-
-  @Test
-  public void testAddSpec() throws Exception {
-    _closer = Closer.create();
-    _properties = new Properties();
-
-    // Properties for Producer
-    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
-    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers);
-    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-
-    // Properties for Consumer
-    _properties.setProperty("jobSpecMonitor.kafka.zookeeper.connect", zkConnect);
-    _properties.setProperty(SimpleKafkaSpecExecutorInstanceProducer.SPEC_KAFKA_TOPICS_KEY, TOPIC);
-    _properties.setProperty("gobblin.cluster.jobconf.fullyQualifiedPath", _JOBS_DIR_PATH);
-
-    Config config = ConfigUtils.propertiesToConfig(_properties);
-
-    // SEI Producer
-    _seip = _closer.register(new SimpleKafkaSpecExecutorInstanceProducer(config));
-
-    String addedSpecUriString = "/foo/bar/addedSpec";
-    Spec spec = initJobSpec(addedSpecUriString);
-    WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
-    log.info("WriteResponse: " + writeResponse);
-
-    _jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster"));
-    _jobCatalog.startAsync().awaitRunning();
-
-    // SEI Consumer
-    _seic = _closer.register(new StreamingKafkaSpecExecutorInstanceConsumer(config, _jobCatalog));
-    _seic.startAsync().awaitRunning();
-
-    List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
-    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
-
-    Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
-    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.ADD), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match");
-    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
-  }
-
-  @Test (dependsOnMethods = "testAddSpec")
-  public void testUpdateSpec() throws Exception {
-    // update is only treated as an update for existing job specs
-    String updatedSpecUriString = "/foo/bar/addedSpec";
-    Spec spec = initJobSpec(updatedSpecUriString);
-    WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get();
-    log.info("WriteResponse: " + writeResponse);
-
-    List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
-    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
-
-    Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
-    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.UPDATE), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match");
-    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
-  }
-
-  @Test (dependsOnMethods = "testUpdateSpec")
-  public void testDeleteSpec() throws Exception {
-    // delete needs to be on a job spec that exists to get notification
-    String deletedSpecUriString = "/foo/bar/addedSpec";
-    WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get();
-    log.info("WriteResponse: " + writeResponse);
-
-    List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
-    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
-
-    Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
-    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.DELETE), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match");
-    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
-  }
-
-  private JobSpec initJobSpec(String specUri) {
-    Properties properties = new Properties();
-    return JobSpec.builder(specUri)
-        .withConfig(ConfigUtils.propertiesToConfig(properties))
-        .withVersion("1")
-        .withDescription("Spec Description")
-        .build();
-  }
-
-  @AfterClass
-  public void after() {
-    try {
-      _closer.close();
-    } catch(Exception e) {
-      log.error("Failed to close SEIC and SEIP.", e);
-    }
-    try {
-      close();
-    } catch(Exception e) {
-      log.error("Failed to close Kafka server.", e);
-    }
-
-    if (_jobCatalog != null) {
-      _jobCatalog.stopAsync().awaitTerminated();
-    }
-
-    cleanupTestDir();
-  }
-
-  @AfterSuite
-  public void afterSuite() {
-    closeServer();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
new file mode 100644
index 0000000..e9c7ee6
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
+import org.apache.gobblin.metrics.reporter.KafkaTestBase;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.WriteResponse;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
+
+  public static final String TOPIC = StreamingKafkaSpecExecutorTest.class.getSimpleName();
+
+  private Closer _closer;
+  private Properties _properties;
+  private SimpleKafkaSpecProducer _seip;
+  private StreamingKafkaSpecConsumer _seic;
+  private NonObservingFSJobCatalog _jobCatalog;
+  private String _kafkaBrokers;
+  private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
+  private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
+
+  public StreamingKafkaSpecExecutorTest()
+      throws InterruptedException, RuntimeException {
+    super(TOPIC);
+    _kafkaBrokers = "localhost:" + kafkaPort;
+    log.info("Going to use Kakfa broker: " + _kafkaBrokers);
+
+    cleanupTestDir();
+  }
+
+  private void cleanupTestDir() {
+    File testDir = new File(_TEST_DIR_PATH);
+
+    if (testDir.exists()) {
+      try {
+        FileUtils.deleteDirectory(testDir);
+      } catch (IOException e) {
+        throw new RuntimeException("Could not delete test directory", e);
+      }
+    }
+  }
+
+  @Test
+  public void testAddSpec() throws Exception {
+    _closer = Closer.create();
+    _properties = new Properties();
+
+    // Properties for Producer
+    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
+    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers);
+    _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+    // Properties for Consumer
+    _properties.setProperty("jobSpecMonitor.kafka.zookeeper.connect", zkConnect);
+    _properties.setProperty(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY, TOPIC);
+    _properties.setProperty("gobblin.cluster.jobconf.fullyQualifiedPath", _JOBS_DIR_PATH);
+
+    Config config = ConfigUtils.propertiesToConfig(_properties);
+
+    // SEI Producer
+    _seip = _closer.register(new SimpleKafkaSpecProducer(config));
+
+    String addedSpecUriString = "/foo/bar/addedSpec";
+    Spec spec = initJobSpec(addedSpecUriString);
+    WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
+    log.info("WriteResponse: " + writeResponse);
+
+    _jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster"));
+    _jobCatalog.startAsync().awaitRunning();
+
+    // SEI Consumer
+    _seic = _closer.register(new StreamingKafkaSpecConsumer(config, _jobCatalog));
+    _seic.startAsync().awaitRunning();
+
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+  @Test (dependsOnMethods = "testAddSpec")
+  public void testUpdateSpec() throws Exception {
+    // update is only treated as an update for existing job specs
+    String updatedSpecUriString = "/foo/bar/addedSpec";
+    Spec spec = initJobSpec(updatedSpecUriString);
+    WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get();
+    log.info("WriteResponse: " + writeResponse);
+
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.UPDATE), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+  @Test (dependsOnMethods = "testUpdateSpec")
+  public void testDeleteSpec() throws Exception {
+    // delete needs to be on a job spec that exists to get notification
+    String deletedSpecUriString = "/foo/bar/addedSpec";
+    WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get();
+    log.info("WriteResponse: " + writeResponse);
+
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+  private JobSpec initJobSpec(String specUri) {
+    Properties properties = new Properties();
+    return JobSpec.builder(specUri)
+        .withConfig(ConfigUtils.propertiesToConfig(properties))
+        .withVersion("1")
+        .withDescription("Spec Description")
+        .build();
+  }
+
+  @AfterClass
+  public void after() {
+    try {
+      _closer.close();
+    } catch(Exception e) {
+      log.error("Failed to close SEIC and SEIP.", e);
+    }
+    try {
+      close();
+    } catch(Exception e) {
+      log.error("Failed to close Kafka server.", e);
+    }
+
+    if (_jobCatalog != null) {
+      _jobCatalog.stopAsync().awaitTerminated();
+    }
+
+    cleanupTestDir();
+  }
+
+  @AfterSuite
+  public void afterSuite() {
+    closeServer();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
index bb617a7..aceb5dd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java
@@ -24,16 +24,16 @@ import org.apache.gobblin.instrumented.Instrumentable;
 
 /***
  * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
- * and the mapping to {@link SpecExecutorInstance} that they can be run on.
+ * and the mapping to {@link SpecExecutor} that they can be run on.
  */
 public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
   /***
    * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
-   * and the mapping to {@link SpecExecutorInstance} that they can be run on.
+   * and the mapping to {@link SpecExecutor} that they can be run on.
    * @param spec {@link Spec} to compile.
-   * @return Map of materialized physical {@link Spec} and {@link SpecExecutorInstance}.
+   * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
    */
-  Map<Spec, SpecExecutorInstanceProducer> compileFlow(Spec spec);
+  Map<Spec, SpecExecutor> compileFlow(Spec spec);
 
   /***
    * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler}
@@ -41,4 +41,4 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable {
    * @return Map of {@link Spec} URI and {@link TopologySpec}
    */
   Map<URI, TopologySpec> getTopologySpecMap();
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java
index da89b9e..c33f3de 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java
@@ -25,26 +25,25 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import edu.umd.cs.findbugs.annotations.SuppressWarnings;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
 
 /**
- * Data model representation that describes a topology ie. a {@link SpecExecutorInstance} and its
+ * Data model representation that describes a topology ie. a {@link SpecExecutor} and its
  * capabilities tuple .
  *
  */
@@ -53,8 +52,8 @@ import org.apache.gobblin.util.ConfigUtils;
 @AllArgsConstructor
 @NotThreadSafe
 public class TopologySpec implements Configurable, Spec {
-  public static final String DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER = InMemorySpecExecutorInstanceProducer.class.getCanonicalName();
-  public static final String SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY = "specExecutorInstanceProducer.class";
+  public static final String DEFAULT_SPEC_EXECUTOR_INSTANCE = InMemorySpecExecutor.class.getCanonicalName();
+  public static final String SPEC_EXECUTOR_INSTANCE_KEY = "specExecutorInstance.class";
 
   private static final long serialVersionUID = 6106269076155338046L;
 
@@ -78,26 +77,29 @@ public class TopologySpec implements Configurable, Spec {
 
   /** Underlying executor instance such as Gobblin cluster or Azkaban */
   @SuppressWarnings(justification="Initialization handled by getter", value="SE_TRANSIENT_FIELD_NOT_RESTORED")
-  transient SpecExecutorInstanceProducer specExecutorInstanceProducer;
+  transient SpecExecutor specExecutorInstance;
 
-  public SpecExecutorInstanceProducer getSpecExecutorInstanceProducer() {
-    if (null == specExecutorInstanceProducer) {
-      String specExecutorInstanceProducerClass = DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER;
-      if (config.hasPath(SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY)) {
-        specExecutorInstanceProducerClass = config.getString(SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY);
+  /**
+   * @return A {@link SpecExecutor}'s instance defined by <Technology, Location, Communication Mechanism>
+   */
+  public synchronized SpecExecutor getSpecExecutor() {
+    if (null == specExecutorInstance) {
+      String specExecutorClass = DEFAULT_SPEC_EXECUTOR_INSTANCE;
+      if (config.hasPath(SPEC_EXECUTOR_INSTANCE_KEY)) {
+        specExecutorClass = config.getString(SPEC_EXECUTOR_INSTANCE_KEY);
       }
       try {
-        ClassAliasResolver<SpecExecutorInstanceProducer> _aliasResolver =
-            new ClassAliasResolver<>(SpecExecutorInstanceProducer.class);
-        specExecutorInstanceProducer = (SpecExecutorInstanceProducer) ConstructorUtils
+        ClassAliasResolver<SpecExecutor> _aliasResolver =
+            new ClassAliasResolver<>(SpecExecutor.class);
+        specExecutorInstance = (SpecExecutor) ConstructorUtils
             .invokeConstructor(Class.forName(_aliasResolver
-                .resolve(specExecutorInstanceProducerClass)), config);
+                .resolve(specExecutorClass)), config);
       } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
           | ClassNotFoundException e) {
         throw new RuntimeException(e);
       }
     }
-    return specExecutorInstanceProducer;
+    return specExecutorInstance;
   }
 
   public static TopologySpec.Builder builder(URI topologySpecUri) {
@@ -164,7 +166,7 @@ public class TopologySpec implements Configurable, Spec {
     private String version = "1";
     private Optional<String> description = Optional.absent();
     private Optional<URI> topologyCatalogURI = Optional.absent();
-    private Optional<SpecExecutorInstanceProducer> specExecutorInstanceProducer = Optional.absent();
+    private Optional<SpecExecutor> specExecutorInstance = Optional.absent();
 
     public Builder(URI topologySpecUri) {
       Preconditions.checkNotNull(topologySpecUri);
@@ -189,9 +191,9 @@ public class TopologySpec implements Configurable, Spec {
     public TopologySpec build() {
       Preconditions.checkNotNull(this.uri);
       Preconditions.checkNotNull(this.version);
+      return new TopologySpec(getURI(), getVersion(), getDescription(), getConfig(), getConfigAsProperties(),
+          getSpecExceutorInstance());
 
-      return new TopologySpec(getURI(), getVersion(), getDescription(), getConfig(),
-          getConfigAsProperties(), getSpecExceutorInstanceProducer());
     }
 
     /** The scheme and authority of the topology catalog URI are used to generate TopologySpec URIs from
@@ -314,17 +316,17 @@ public class TopologySpec implements Configurable, Spec {
       return this;
     }
 
-    public SpecExecutorInstanceProducer getSpecExceutorInstanceProducer() {
-      if (!this.specExecutorInstanceProducer.isPresent()) {
-        // TODO: Try to init SpecExecutorInstanceProducer from config if not initialized via builder.
-        throw new RuntimeException("SpecExecutorInstanceProducer not initialized.");
+    public SpecExecutor getSpecExceutorInstance() {
+      if (!this.specExecutorInstance.isPresent()) {
+        // TODO: Try to init SpecProducer from config if not initialized via builder.
+        throw new RuntimeException("SpecExecutor not initialized.");
       }
-      return this.specExecutorInstanceProducer.get();
+      return this.specExecutorInstance.get();
     }
 
-    public TopologySpec.Builder withSpecExecutorInstanceProducer(SpecExecutorInstanceProducer specExecutorInstanceProducer) {
-      Preconditions.checkNotNull(specExecutorInstanceProducer);
-      this.specExecutorInstanceProducer = Optional.of(specExecutorInstanceProducer);
+    public TopologySpec.Builder withSpecExecutor(SpecExecutor specExecutor) {
+      Preconditions.checkNotNull(specExecutor);
+      this.specExecutorInstance = Optional.of(specExecutor);
       return this;
     }
   }
@@ -337,4 +339,4 @@ public class TopologySpec implements Configurable, Spec {
     return this.uri;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index 1f2ce21..59733d3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -21,41 +21,25 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
-import java.util.Map;
 import java.util.Properties;
-import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.Path;
-
-import com.codahale.metrics.Counter;
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValue;
 
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.NoopSchemaVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
 import org.apache.gobblin.runtime.api.JobSpecMonitorFactory;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance;
-import org.apache.gobblin.runtime.api.SpecExecutorInstance.Verb;
+import org.apache.gobblin.runtime.api.SpecExecutor.Verb;
 import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.util.Either;
-import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -153,4 +137,4 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
       return Lists.newArrayList(Either.<JobSpec, URI>right(jobSpec.getUri()));
     }
   }
-}
+}
\ No newline at end of file


Mime
View raw message