gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject [1/2] incubator-gobblin git commit: [GOBBLIN-292] Add kafka09 support for service and cluster job spec communication
Date Fri, 20 Oct 2017 00:07:37 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 8f32ab4c1 -> 0ee3cfdca


[GOBBLIN-292] Add kafka09 support for service and cluster job spec communication

Add pluggable consumer client to SimpleKafkaSpecConsumer.
Add pluggable producer to SimpleKafkaSpecProducer.
Move common logic into the gobblin-service-kafka module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/590b872b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/590b872b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/590b872b

Branch: refs/heads/master
Commit: 590b872b094b5a27b2a12e2bc66d1a7514309bb5
Parents: 626d312
Author: Hung Tran <hutran@linkedin.com>
Authored: Thu Oct 19 13:09:35 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Thu Oct 19 13:09:35 2017 -0700

----------------------------------------------------------------------
 gobblin-modules/gobblin-kafka-08/build.gradle   |   1 +
 .../service/SimpleKafkaSpecConsumer.java        | 264 -----------------
 .../service/SimpleKafkaSpecExecutor.java        | 105 -------
 .../service/SimpleKafkaSpecProducer.java        | 140 ---------
 .../service/StreamingKafkaSpecConsumer.java     | 173 -----------
 .../kafka/client/Kafka09ConsumerClient.java     |  19 +-
 .../client/AbstractBaseKafkaConsumerClient.java |   3 +-
 .../gobblin-service-kafka/build.gradle          |  44 +++
 .../service/AvroJobSpecDeserializer.java        |  70 +++++
 .../service/SimpleKafkaSpecConsumer.java        | 287 +++++++++++++++++++
 .../service/SimpleKafkaSpecExecutor.java        | 102 +++++++
 .../service/SimpleKafkaSpecProducer.java        | 157 ++++++++++
 .../service/StreamingKafkaSpecConsumer.java     | 175 +++++++++++
 13 files changed, 855 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/build.gradle b/gobblin-modules/gobblin-kafka-08/build.gradle
index b72ce3a..bc20884 100644
--- a/gobblin-modules/gobblin-kafka-08/build.gradle
+++ b/gobblin-modules/gobblin-kafka-08/build.gradle
@@ -56,6 +56,7 @@ dependencies {
   runtime externalDependency.confluentSchemaRegistryClient
   runtime externalDependency.protobuf
 
+  testCompile project(":gobblin-modules:gobblin-service-kafka")
   testCompile project(":gobblin-runtime")
   testCompile project(":gobblin-test-utils")
   testCompile externalDependency.jsonAssert

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
deleted file mode 100644
index 083ccf3..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
-import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
-import org.apache.gobblin.kafka.client.Kafka08ConsumerClient;
-import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
-import org.apache.gobblin.util.CompletedFuture;
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-@Slf4j
-public class SimpleKafkaSpecConsumer implements SpecConsumer<Spec>, Closeable {
-
-  // Consumer
-  protected final GobblinKafkaConsumerClient _kafka08Consumer;
-  protected final List<KafkaPartition> _partitions;
-  protected final List<Long> _lowWatermark;
-  protected final List<Long> _nextWatermark;
-  protected final List<Long> _highWatermark;
-
-  private Iterator<KafkaConsumerRecord> messageIterator = null;
-  private int currentPartitionIdx = -1;
-  private boolean isFirstRun = true;
-
-  private final BinaryDecoder _decoder;
-  private final SpecificDatumReader<AvroJobSpec> _reader;
-  private final SchemaVersionWriter<?> _versionWriter;
-
-  public SimpleKafkaSpecConsumer(Config config, Optional<Logger> log) {
-
-    // Consumer
-    _kafka08Consumer = new Kafka08ConsumerClient.Factory().create(config);
-    List<KafkaTopic> kafkaTopics = _kafka08Consumer.getFilteredTopics(Collections.EMPTY_LIST,
-        Lists.newArrayList(Pattern.compile(config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY))));
-    _partitions = kafkaTopics.get(0).getPartitions();
-    _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
-    _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
-    _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
-
-    InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
-    _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
-    _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
-    _versionWriter = new FixedSchemaVersionWriter();
-  }
-
-  public SimpleKafkaSpecConsumer(Config config, Logger log) {
-    this(config, Optional.of(log));
-  }
-
-  /** Constructor with no logging */
-  public SimpleKafkaSpecConsumer(Config config) {
-    this(config, Optional.<Logger>absent());
-  }
-
-  @Override
-  public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
-    List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>();
-    initializeWatermarks();
-    this.currentPartitionIdx = -1;
-    while (!allPartitionsFinished()) {
-      if (currentPartitionFinished()) {
-        moveToNextPartition();
-        continue;
-      }
-      if (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        try {
-          this.messageIterator = fetchNextMessageBuffer();
-        } catch (Exception e) {
-          log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
-              getCurrentPartition()), e);
-          moveToNextPartition();
-          continue;
-        }
-        if (this.messageIterator == null || !this.messageIterator.hasNext()) {
-          moveToNextPartition();
-          continue;
-        }
-      }
-      while (!currentPartitionFinished()) {
-        if (!this.messageIterator.hasNext()) {
-          break;
-        }
-
-        KafkaConsumerRecord nextValidMessage = this.messageIterator.next();
-
-        // Even though we ask Kafka to give us a message buffer starting from offset x, it may
-        // return a buffer that starts from offset smaller than x, so we need to skip messages
-        // until we get to x.
-        if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) {
-          continue;
-        }
-
-        _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
-        try {
-          final AvroJobSpec record;
-
-          if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
-            record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
-          } else if (nextValidMessage instanceof DecodeableKafkaRecord){
-            record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue();
-          } else {
-            throw new IllegalStateException(
-                "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
-                    + " or DecodeableKafkaRecord");
-          }
-
-          JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
-
-          Properties props = new Properties();
-          props.putAll(record.getProperties());
-          jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion())
-              .withDescription(record.getDescription()).withConfigAsProperties(props);
-
-          if (!record.getTemplateUri().isEmpty()) {
-            jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
-          }
-
-          String verbName = record.getMetadata().get(VERB_KEY);
-          Verb verb = Verb.valueOf(verbName);
-
-          changesSpecs.add(new ImmutablePair<Verb, Spec>(verb, jobSpecBuilder.build()));
-        } catch (Throwable t) {
-          log.error("Could not decode record at partition " + this.currentPartitionIdx +
-              " offset " + nextValidMessage.getOffset());
-        }
-      }
-    }
-
-    return new CompletedFuture(changesSpecs, null);
-  }
-
-  private void initializeWatermarks() {
-    initializeLowWatermarks();
-    initializeHighWatermarks();
-  }
-
-  private void initializeLowWatermarks() {
-    try {
-      int i=0;
-      for (KafkaPartition kafkaPartition : _partitions) {
-        if (isFirstRun) {
-          long earliestOffset = _kafka08Consumer.getEarliestOffset(kafkaPartition);
-          _lowWatermark.set(i, earliestOffset);
-        } else {
-          _lowWatermark.set(i, _highWatermark.get(i));
-        }
-        i++;
-      }
-      isFirstRun = false;
-    } catch (KafkaOffsetRetrievalFailureException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void initializeHighWatermarks() {
-    try {
-      int i=0;
-      for (KafkaPartition kafkaPartition : _partitions) {
-        long latestOffset = _kafka08Consumer.getLatestOffset(kafkaPartition);
-        _highWatermark.set(i, latestOffset);
-        i++;
-      }
-    } catch (KafkaOffsetRetrievalFailureException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private boolean allPartitionsFinished() {
-    return this.currentPartitionIdx >= _nextWatermark.size();
-  }
-
-  private boolean currentPartitionFinished() {
-    if (this.currentPartitionIdx == -1) {
-      return true;
-    } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private int moveToNextPartition() {
-    this.messageIterator = null;
-    return this.currentPartitionIdx ++;
-  }
-
-  private KafkaPartition getCurrentPartition() {
-    return _partitions.get(this.currentPartitionIdx);
-  }
-
-  private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() {
-    return _kafka08Consumer.consume(_partitions.get(this.currentPartitionIdx),
-        _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx));
-  }
-
-  private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
-    InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes());
-    _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
-
-    Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
-
-    return _reader.read(null, decoder);
-  }
-
-  @Override
-  public void close() throws IOException {
-    _kafka08Consumer.close();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
deleted file mode 100644
index 8545bf6..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.Future;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-import com.google.common.io.Closer;
-
-import org.slf4j.Logger;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
-
-/**
- * An {@link SpecExecutor} that use Kafka as the communication mechanism.
- */
-public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
-  public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics";
-
-
-  protected static final String VERB_KEY = "Verb";
-
-  private SpecProducer<Spec> specProducer;
-
-  public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) {
-    super(config, log);
-    specProducer = new SimpleKafkaSpecProducer(config, log);
-  }
-
-  /**
-   * Constructor with no logging, necessary for simple use case.
-   * @param config
-   */
-  public SimpleKafkaSpecExecutor(Config config) {
-    this(config, Optional.absent());
-  }
-
-  @Override
-  public Future<? extends SpecProducer> getProducer() {
-    return new CompletedFuture<>(this.specProducer, null);
-  }
-
-  @Override
-  public Future<String> getDescription() {
-    return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    optionalCloser = Optional.of(Closer.create());
-    specProducer = optionalCloser.get().register((SimpleKafkaSpecProducer) specProducer);
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    if (optionalCloser.isPresent()) {
-      optionalCloser.get().close();
-    } else {
-      log.warn("There's no Closer existed in " + this.getClass().getName());
-    }
-  }
-
-  public static class SpecExecutorInstanceDataPacket implements Serializable {
-
-    protected Verb _verb;
-    protected URI _uri;
-    protected Spec _spec;
-
-    public SpecExecutorInstanceDataPacket(Verb verb, URI uri, Spec spec) {
-      _verb = verb;
-      _uri = uri;
-      _spec = spec;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
deleted file mode 100644
index 13aae6f..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.Future;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-
-import org.slf4j.Logger;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.kafka.writer.Kafka08DataWriter;
-import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
-import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
-import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.WriteCallback;
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-@NotThreadSafe
-public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
-
-  // Producer
-  protected Kafka08DataWriter<byte[]> _kafka08Producer;
-  private final AvroSerializer<AvroJobSpec> _serializer;
-  private Config _config;
-
-  public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) {
-
-    try {
-      _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter());
-      _config = config;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not create AvroBinarySerializer", e);
-    }
-  }
-
-  public SimpleKafkaSpecProducer(Config config, Logger log) {
-    this(config, Optional.of(log));
-  }
-
-  /** Constructor with no logging */
-  public SimpleKafkaSpecProducer(Config config) {
-    this(config, Optional.<Logger>absent());
-  }
-
-  @Override
-  public Future<?> addSpec(Spec addedSpec) {
-    AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
-
-    log.info("Adding Spec: " + addedSpec + " using Kafka.");
-
-    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
-  }
-
-  @Override
-  public Future<?> updateSpec(Spec updatedSpec) {
-    AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
-
-    log.info("Updating Spec: " + updatedSpec + " using Kafka.");
-
-    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
-  }
-
-  @Override
-  public Future<?> deleteSpec(URI deletedSpecURI) {
-
-    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
-        .setMetadata(ImmutableMap.of(VERB_KEY, Verb.DELETE.name())).build();
-
-    log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
-
-    return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
-  }
-
-  @Override
-  public Future<? extends List<Spec>> listSpecs() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void close() throws IOException {
-    _kafka08Producer.close();
-  }
-
-  private Kafka08DataWriter<byte[]> getKafka08Producer() {
-    if (null == _kafka08Producer) {
-      _kafka08Producer = new Kafka08DataWriter<byte[]>(ConfigUtils.configToProperties(_config));
-    }
-    return _kafka08Producer;
-  }
-
-  private AvroJobSpec convertToAvroJobSpec(Spec spec, Verb verb) {
-    if (spec instanceof JobSpec) {
-      JobSpec jobSpec = (JobSpec) spec;
-      AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder();
-
-      avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
-          .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
-          .setMetadata(ImmutableMap.of(VERB_KEY, verb.name()));
-
-      if (jobSpec.getTemplateURI().isPresent()) {
-        avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
-      }
-
-      return avroJobSpecBuilder.build();
-    } else {
-      throw new RuntimeException("Unsupported spec type " + spec.getClass());
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
deleted file mode 100644
index fd42211..0000000
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecConsumer;
-import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor;
-import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
-import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
-import org.apache.gobblin.util.CompletedFuture;
-import org.apache.gobblin.util.ConfigUtils;
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-/**
- * SpecConsumer that consumes from kafka in a streaming manner
- * Implemented {@link AbstractIdleService} for starting up and shutting down.
- */
-public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable {
-  public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize";
-  private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
-  private final AvroJobSpecKafkaJobMonitor _jobMonitor;
-  private final BlockingQueue<ImmutablePair<Verb, Spec>> _jobSpecQueue;
-
-  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) {
-    String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
-    Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic,
-        KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST));
-
-    try {
-      _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory())
-          .forConfig(config.withFallback(defaults), jobCatalog);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not create job monitor", e);
-    }
-
-    _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
-        DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
-
-    // listener will add job specs to a blocking queue to send to callers of changedSpecs()
-    jobCatalog.addListener(new JobSpecListener());
-  }
-
-  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) {
-    this(config, jobCatalog, Optional.of(log));
-  }
-
-  /** Constructor with no logging */
-  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog) {
-    this(config, jobCatalog, Optional.<Logger>absent());
-  }
-
-  /**
-   * This method returns job specs receive from Kafka. It will block if there are no job specs.
-   * @return list of (verb, jobspecs) pairs.
-   */
-  @Override
-  public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() {
-    List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>();
-
-    try {
-      Pair<Verb, Spec> specPair = _jobSpecQueue.take();
-
-      do {
-        changesSpecs.add(specPair);
-
-        // if there are more elements then pass them along in this call
-        specPair = _jobSpecQueue.poll();
-      } while (specPair != null);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-
-    return new CompletedFuture(changesSpecs, null);
-  }
-
-  @Override
-  protected void startUp() {
-    _jobMonitor.startAsync().awaitRunning();
-  }
-
-  @Override
-  protected void shutDown() {
-    _jobMonitor.stopAsync().awaitTerminated();
-  }
-
-  @Override
-  public void close() throws IOException {
-    shutDown();
-  }
-
-  /**
-   * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of
-   * {@link StreamingKafkaSpecConsumer}
-   */
-  protected class JobSpecListener extends DefaultJobCatalogListenerImpl {
-    public JobSpecListener() {
-      super(StreamingKafkaSpecConsumer.this.log);
-    }
-
-    @Override public void onAddJob(JobSpec addedJob) {
-      super.onAddJob(addedJob);
-
-      try {
-        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.ADD, addedJob));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
-      super.onDeleteJob(deletedJobURI, deletedJobVersion);
-      try {
-        JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI);
-
-        Properties props = new Properties();
-        jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props);
-
-        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.DELETE, jobSpecBuilder.build()));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override public void onUpdateJob(JobSpec updatedJob) {
-      super.onUpdateJob(updatedJob);
-
-      try {
-        _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.UPDATE, updatedJob));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index b6cd35d..9b952ab 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.kafka.client;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
@@ -81,6 +82,9 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
     super(config);
     Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
         "Missing required property " + GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY);
+
+    Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, AbstractBaseKafkaConsumerClient.CONFIG_PREFIX_NO_DOT);
+
     Properties props = new Properties();
     props.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers));
     props.put(KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY, KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT);
@@ -89,6 +93,9 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
         ConfigUtils.getString(config, GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY, KAFKA_09_DEFAULT_KEY_DESERIALIZER));
     props.put(KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY,
         config.getString(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY));
+
+    props.putAll(ConfigUtils.configToProperties(scopedConfig));
+
     this.consumer = new KafkaConsumer<>(props);
 
   }
@@ -112,12 +119,20 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
 
   @Override
   public long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
-    throw new UnsupportedOperationException("getEarliestOffset and getLatestOffset is not supported by Kafka-09");
+    TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
+    this.consumer.assign(Collections.singletonList(topicPartition));
+    this.consumer.seekToBeginning(topicPartition);
+
+    return this.consumer.position(topicPartition);
   }
 
   @Override
   public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
-    throw new UnsupportedOperationException("getEarliestOffset and getLatestOffset is not supported by Kafka-09");
+    TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
+    this.consumer.assign(Collections.singletonList(topicPartition));
+    this.consumer.seekToEnd(topicPartition);
+
+    return this.consumer.position(topicPartition);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 00b751b..638a879 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -41,7 +41,8 @@ import javax.annotation.Nullable;
  */
 public abstract class AbstractBaseKafkaConsumerClient implements GobblinKafkaConsumerClient {
 
-  public static final String CONFIG_PREFIX = "source.kafka.";
+  public static final String CONFIG_PREFIX_NO_DOT = "source.kafka";
+  public static final String CONFIG_PREFIX = CONFIG_PREFIX_NO_DOT + ".";
   public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = CONFIG_PREFIX + "fetchTimeoutMillis";
   public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000; // 1 second
   public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = CONFIG_PREFIX + "fetchMinBytes";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/build.gradle b/gobblin-modules/gobblin-service-kafka/build.gradle
new file mode 100644
index 0000000..e4d1146
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/build.gradle
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+  compile project(":gobblin-runtime")
+
+  compile externalDependency.avro
+  compile externalDependency.slf4j
+  compile externalDependency.lombok
+  compile externalDependency.typesafeConfig
+
+  testCompile externalDependency.testng
+}
+
+configurations {
+  compile { transitive = false }
+  // Remove xerces dependencies because of versioning issues. Standard JRE implementation should
+  // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+  // HADOOP-5254 and MAPREDUCE-5664
+  all*.exclude group: 'xml-apis'
+  all*.exclude group: 'xerces'
+}
+
+test {
+  workingDir rootProject.rootDir
+}
+
+ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
new file mode 100644
index 0000000..879dea4
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+/**
+ * A deserializer that converts a byte array into an {@link AvroJobSpec}
+ */
+public class AvroJobSpecDeserializer implements Deserializer<AvroJobSpec> {
+  private BinaryDecoder _decoder;
+  private SpecificDatumReader<AvroJobSpec> _reader;
+  private SchemaVersionWriter<?> _versionWriter;
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
+    _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
+    _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
+    _versionWriter = new FixedSchemaVersionWriter();
+  }
+
+  @Override
+  public AvroJobSpec deserialize(String topic, byte[] data) {
+    try (InputStream is = new ByteArrayInputStream(data)) {
+      _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+
+      Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
+
+      return _reader.read(null, decoder);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not decode message");
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
new file mode 100644
index 0000000..139d204
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class SimpleKafkaSpecConsumer implements SpecConsumer<Spec>, Closeable {
+  private static final String CONSUMER_CLIENT_FACTORY_CLASS_KEY = "spec.kafka.consumerClientClassFactory";
+  private static final String DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS =
+      "org.apache.gobblin.kafka.client.Kafka08ConsumerClient$Factory";
+
+  // Consumer
+  protected final GobblinKafkaConsumerClient _kafkaConsumer;
+  protected final List<KafkaPartition> _partitions;
+  protected final List<Long> _lowWatermark;
+  protected final List<Long> _nextWatermark;
+  protected final List<Long> _highWatermark;
+
+  private Iterator<KafkaConsumerRecord> messageIterator = null;
+  private int currentPartitionIdx = -1;
+  private boolean isFirstRun = true;
+
+  private final BinaryDecoder _decoder;
+  private final SpecificDatumReader<AvroJobSpec> _reader;
+  private final SchemaVersionWriter<?> _versionWriter;
+
+  public SimpleKafkaSpecConsumer(Config config, Optional<Logger> log) {
+
+    // Consumer
+    String kafkaConsumerClientClass = ConfigUtils.getString(config, CONSUMER_CLIENT_FACTORY_CLASS_KEY,
+        DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS);
+
+    try {
+      Class<?> clientFactoryClass = (Class<?>) Class.forName(kafkaConsumerClientClass);
+      final GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory factory =
+          (GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory)
+              ConstructorUtils.invokeConstructor(clientFactoryClass);
+
+      _kafkaConsumer = factory.create(config);
+    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+      if (log.isPresent()) {
+        log.get().error("Failed to instantiate Kafka consumer from class " + kafkaConsumerClientClass, e);
+      }
+
+      throw new RuntimeException("Failed to instantiate Kafka consumer", e);
+    }
+
+    List<KafkaTopic> kafkaTopics = _kafkaConsumer.getFilteredTopics(Collections.EMPTY_LIST,
+        Lists.newArrayList(Pattern.compile(config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY))));
+    _partitions = kafkaTopics.get(0).getPartitions();
+    _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+    _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+    _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L));
+
+    InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
+    _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null);
+    _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$);
+    _versionWriter = new FixedSchemaVersionWriter();
+  }
+
+  public SimpleKafkaSpecConsumer(Config config, Logger log) {
+    this(config, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public SimpleKafkaSpecConsumer(Config config) {
+    this(config, Optional.<Logger>absent());
+  }
+
+  @Override
+  public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
+    List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>();
+    initializeWatermarks();
+    this.currentPartitionIdx = -1;
+    while (!allPartitionsFinished()) {
+      if (currentPartitionFinished()) {
+        moveToNextPartition();
+        continue;
+      }
+      if (this.messageIterator == null || !this.messageIterator.hasNext()) {
+        try {
+          this.messageIterator = fetchNextMessageBuffer();
+        } catch (Exception e) {
+          log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
+              getCurrentPartition()), e);
+          moveToNextPartition();
+          continue;
+        }
+        if (this.messageIterator == null || !this.messageIterator.hasNext()) {
+          moveToNextPartition();
+          continue;
+        }
+      }
+      while (!currentPartitionFinished()) {
+        if (!this.messageIterator.hasNext()) {
+          break;
+        }
+
+        KafkaConsumerRecord nextValidMessage = this.messageIterator.next();
+
+        // Even though we ask Kafka to give us a message buffer starting from offset x, it may
+        // return a buffer that starts from offset smaller than x, so we need to skip messages
+        // until we get to x.
+        if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) {
+          continue;
+        }
+
+        _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
+        try {
+          final AvroJobSpec record;
+
+          if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
+            record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
+          } else if (nextValidMessage instanceof DecodeableKafkaRecord){
+            record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue();
+          } else {
+            throw new IllegalStateException(
+                "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
+                    + " or DecodeableKafkaRecord");
+          }
+
+          JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
+
+          Properties props = new Properties();
+          props.putAll(record.getProperties());
+          jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion())
+              .withDescription(record.getDescription()).withConfigAsProperties(props);
+
+          if (!record.getTemplateUri().isEmpty()) {
+            jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
+          }
+
+          String verbName = record.getMetadata().get(VERB_KEY);
+          SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
+
+          changesSpecs.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpecBuilder.build()));
+        } catch (Throwable t) {
+          log.error("Could not decode record at partition " + this.currentPartitionIdx +
+              " offset " + nextValidMessage.getOffset());
+        }
+      }
+    }
+
+    return new CompletedFuture(changesSpecs, null);
+  }
+
+  private void initializeWatermarks() {
+    initializeLowWatermarks();
+    initializeHighWatermarks();
+  }
+
+  private void initializeLowWatermarks() {
+    try {
+      int i=0;
+      for (KafkaPartition kafkaPartition : _partitions) {
+        if (isFirstRun) {
+          long earliestOffset = _kafkaConsumer.getEarliestOffset(kafkaPartition);
+          _lowWatermark.set(i, earliestOffset);
+        } else {
+          _lowWatermark.set(i, _highWatermark.get(i));
+        }
+        i++;
+      }
+      isFirstRun = false;
+    } catch (KafkaOffsetRetrievalFailureException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void initializeHighWatermarks() {
+    try {
+      int i=0;
+      for (KafkaPartition kafkaPartition : _partitions) {
+        long latestOffset = _kafkaConsumer.getLatestOffset(kafkaPartition);
+        _highWatermark.set(i, latestOffset);
+        i++;
+      }
+    } catch (KafkaOffsetRetrievalFailureException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private boolean allPartitionsFinished() {
+    return this.currentPartitionIdx >= _nextWatermark.size();
+  }
+
+  private boolean currentPartitionFinished() {
+    if (this.currentPartitionIdx == -1) {
+      return true;
+    } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private int moveToNextPartition() {
+    this.messageIterator = null;
+    return this.currentPartitionIdx ++;
+  }
+
+  private KafkaPartition getCurrentPartition() {
+    return _partitions.get(this.currentPartitionIdx);
+  }
+
+  private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() {
+    return _kafkaConsumer.consume(_partitions.get(this.currentPartitionIdx),
+        _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx));
+  }
+
+  private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
+    InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes());
+    _versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+
+    Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
+
+    return _reader.read(null, decoder);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _kafkaConsumer.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
new file mode 100644
index 0000000..c3dfcb3
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
+import org.apache.gobblin.util.CompletedFuture;
+
+/**
+ * An {@link SpecExecutor} that use Kafka as the communication mechanism.
+ */
+public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
+  public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics";
+
+
+  protected static final String VERB_KEY = "Verb";
+
+  private SpecProducer<Spec> specProducer;
+
+  public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) {
+    super(config, log);
+    specProducer = new SimpleKafkaSpecProducer(config, log);
+  }
+
+  /**
+   * Constructor with no logging, necessary for simple use case.
+   * @param config
+   */
+  public SimpleKafkaSpecExecutor(Config config) {
+    this(config, Optional.absent());
+  }
+
+  @Override
+  public Future<? extends SpecProducer> getProducer() {
+    return new CompletedFuture<>(this.specProducer, null);
+  }
+
+  @Override
+  public Future<String> getDescription() {
+    return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    optionalCloser = Optional.of(Closer.create());
+    specProducer = optionalCloser.get().register((SimpleKafkaSpecProducer) specProducer);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    if (optionalCloser.isPresent()) {
+      optionalCloser.get().close();
+    } else {
+      log.warn("There's no Closer existed in " + this.getClass().getName());
+    }
+  }
+
+  public static class SpecExecutorInstanceDataPacket implements Serializable {
+
+    protected SpecExecutor.Verb _verb;
+    protected URI _uri;
+    protected Spec _spec;
+
+    public SpecExecutorInstanceDataPacket(SpecExecutor.Verb verb, URI uri, Spec spec) {
+      _verb = verb;
+      _uri = uri;
+      _spec = spec;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
new file mode 100644
index 0000000..a5163db
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.WriteCallback;
+
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@NotThreadSafe
+public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
+  private static final String KAFKA_DATA_WRITER_CLASS_KEY = "spec.kafka.dataWriterClass";
+  private static final String DEFAULT_KAFKA_DATA_WRITER_CLASS =
+      "org.apache.gobblin.kafka.writer.Kafka08DataWriter";
+
+  // Producer
+  protected AsyncDataWriter<byte[]> _kafkaProducer;
+  private final AvroSerializer<AvroJobSpec> _serializer;
+  private Config _config;
+  private final String _kafkaProducerClassName;
+
+  public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) {
+    _kafkaProducerClassName = ConfigUtils.getString(config, KAFKA_DATA_WRITER_CLASS_KEY,
+        DEFAULT_KAFKA_DATA_WRITER_CLASS);
+
+    try {
+      _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter());
+      _config = config;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create AvroBinarySerializer", e);
+    }
+  }
+
+  public SimpleKafkaSpecProducer(Config config, Logger log) {
+    this(config, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public SimpleKafkaSpecProducer(Config config) {
+    this(config, Optional.<Logger>absent());
+  }
+
+  @Override
+  public Future<?> addSpec(Spec addedSpec) {
+    AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
+
+    log.info("Adding Spec: " + addedSpec + " using Kafka.");
+
+    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+  }
+
+  @Override
+  public Future<?> updateSpec(Spec updatedSpec) {
+    AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
+
+    log.info("Updating Spec: " + updatedSpec + " using Kafka.");
+
+    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+  }
+
+  @Override
+  public Future<?> deleteSpec(URI deletedSpecURI) {
+
+    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
+        .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name())).build();
+
+    log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
+
+    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY);
+  }
+
+  @Override
+  public Future<? extends List<Spec>> listSpecs() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IOException {
+    _kafkaProducer.close();
+  }
+
+  private AsyncDataWriter<byte[]> getKafkaProducer() {
+    if (null == _kafkaProducer) {
+      try {
+        Class<?> kafkaProducerClass = (Class<?>) Class.forName(_kafkaProducerClassName);
+        _kafkaProducer = (AsyncDataWriter<byte[]>) ConstructorUtils.invokeConstructor(kafkaProducerClass,
+            ConfigUtils.configToProperties(_config));
+      } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+        log.error("Failed to instantiate Kafka consumer from class " + _kafkaProducerClassName, e);
+
+        throw new RuntimeException("Failed to instantiate Kafka consumer", e);
+      }
+    }
+    return _kafkaProducer;
+  }
+
+  private AvroJobSpec convertToAvroJobSpec(Spec spec, SpecExecutor.Verb verb) {
+    if (spec instanceof JobSpec) {
+      JobSpec jobSpec = (JobSpec) spec;
+      AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder();
+
+      avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
+          .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
+          .setMetadata(ImmutableMap.of(VERB_KEY, verb.name()));
+
+      if (jobSpec.getTemplateURI().isPresent()) {
+        avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
+      }
+
+      return avroJobSpecBuilder.build();
+    } else {
+      throw new RuntimeException("Unsupported spec type " + spec.getClass());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
new file mode 100644
index 0000000..7d7b702
--- /dev/null
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor;
+import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
+import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+/**
+ * SpecConsumer that consumes from kafka in a streaming manner
+ * Implemented {@link AbstractIdleService} for starting up and shutting down.
+ */
+public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable {
+  public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize";
+  private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
+  private final AvroJobSpecKafkaJobMonitor _jobMonitor;
+  private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> _jobSpecQueue;
+
+  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) {
+    String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
+    Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic,
+        KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST));
+
+    try {
+      _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory())
+          .forConfig(config.withFallback(defaults), jobCatalog);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create job monitor", e);
+    }
+
+    _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
+        DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
+
+    // listener will add job specs to a blocking queue to send to callers of changedSpecs()
+    jobCatalog.addListener(new JobSpecListener());
+  }
+
+  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) {
+    this(config, jobCatalog, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog) {
+    this(config, jobCatalog, Optional.<Logger>absent());
+  }
+
+  /**
+   * This method returns job specs receive from Kafka. It will block if there are no job specs.
+   * @return list of (verb, jobspecs) pairs.
+   */
+  @Override
+  public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
+    List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>();
+
+    try {
+      Pair<SpecExecutor.Verb, Spec> specPair = _jobSpecQueue.take();
+
+      do {
+        changesSpecs.add(specPair);
+
+        // if there are more elements then pass them along in this call
+        specPair = _jobSpecQueue.poll();
+      } while (specPair != null);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    return new CompletedFuture(changesSpecs, null);
+  }
+
+  @Override
+  protected void startUp() {
+    _jobMonitor.startAsync().awaitRunning();
+  }
+
+  @Override
+  protected void shutDown() {
+    _jobMonitor.stopAsync().awaitTerminated();
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutDown();
+  }
+
+  /**
+   * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of
+   * {@link StreamingKafkaSpecConsumer}
+   */
+  protected class JobSpecListener extends DefaultJobCatalogListenerImpl {
+    public JobSpecListener() {
+      super(StreamingKafkaSpecConsumer.this.log);
+    }
+
+    @Override public void onAddJob(JobSpec addedJob) {
+      super.onAddJob(addedJob);
+
+      try {
+        _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.ADD, addedJob));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
+      super.onDeleteJob(deletedJobURI, deletedJobVersion);
+      try {
+        JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI);
+
+        Properties props = new Properties();
+        jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props);
+
+        _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.DELETE, jobSpecBuilder.build()));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override public void onUpdateJob(JobSpec updatedJob) {
+      super.onUpdateJob(updatedJob);
+
+      try {
+        _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.UPDATE, updatedJob));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message