kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4986; Producer per StreamTask support (KIP-129)
Date Fri, 14 Apr 2017 14:21:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1e93c3b9a -> 148f8c254


KAFKA-4986; Producer per StreamTask support (KIP-129)

Enable producer per task if exactly-once config is enabled.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Eno Thereska <eno@confluent.io>, Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2773 from mjsax/exactly-once-streams-producer-per-task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/148f8c25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/148f8c25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/148f8c25

Branch: refs/heads/trunk
Commit: 148f8c25453e453f56ef429f8ef607de808de679
Parents: 1e93c3b
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Fri Apr 14 15:07:49 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Apr 14 15:19:52 2017 +0100

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |  10 +-
 .../org/apache/kafka/streams/StreamsConfig.java | 172 ++++++-----
 .../internals/RecordCollectorImpl.java          |   6 +
 .../streams/processor/internals/StreamTask.java |  69 +++--
 .../processor/internals/StreamThread.java       | 156 ++++++----
 .../processor/internals/StreamTaskTest.java     |  45 ++-
 .../processor/internals/StreamThreadTest.java   | 282 ++++++++++++++-----
 .../apache/kafka/test/MockClientSupplier.java   |   8 +-
 8 files changed, 515 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 607ba69..042be6b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -146,13 +146,7 @@
 
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
-              files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread).java"/>
-    <suppress checks="ClassFanOutComplexity"
-              files="KStreamImpl.java"/>
-    <suppress checks="ClassFanOutComplexity"
-              files="KTableImpl.java"/>
-    <suppress checks="ClassFanOutComplexity"
-              files="StreamThread.java"/>
+              files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
 
     <suppress checks="MethodLength"
               files="StreamPartitionAssignor.java"/>
@@ -184,7 +178,7 @@
 
     <!-- streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              files="(StreamTaskTest|ProcessorTopologyTestDriver).java"/>
+              files="(StreamThreadTest|StreamTaskTest|ProcessorTopologyTestDriver).java"/>
 
     <suppress checks="MethodLength"
               files="KStreamKTableJoinIntegrationTest.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index f968dbc..a04d7f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -94,72 +94,43 @@ public class StreamsConfig extends AbstractConfig {
      */
     public static final String PRODUCER_PREFIX = "producer.";
 
-    /** {@code state.dir} */
-    public static final String STATE_DIR_CONFIG = "state.dir";
-    private static final String STATE_DIR_DOC = "Directory location for state store.";
-
-    /**
-     * {@code zookeeper.connect}
-     * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored.
-     */
-    @Deprecated
-    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
-    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
-
-    /** {@code commit.interval.ms} */
-    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
-    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
-
-    /** {@code poll.ms} */
-    public static final String POLL_MS_CONFIG = "poll.ms";
-    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
+    /** {@code application.id} */
+    public static final String APPLICATION_ID_CONFIG = "application.id";
+    private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
 
-    /** {@code num.stream.threads} */
-    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
-    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+    /**{@code user.endpoint} */
+    public static final String APPLICATION_SERVER_CONFIG = "application.server";
+    private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application";
 
-    /** {@code num.standby.replicas} */
-    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
-    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
+    /** {@code bootstrap.servers} */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
     /** {@code buffered.records.per.partition} */
     public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
     private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
 
-    /** {@code state.cleanup.delay} */
-    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
-    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed";
-
-    /** {@code timestamp.extractor} */
-    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
-    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+    /** {@code cache.max.bytes.buffering} */
+    public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
+    private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
 
-    /** {@code partition.grouper} */
-    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
-    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
+    /** {@code client.id} */
+    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
 
-    /** {@code application.id} */
-    public static final String APPLICATION_ID_CONFIG = "application.id";
-    private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+    /** {@code commit.interval.ms} */
+    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
+    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
 
-    /** {@code replication.factor} */
-    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
-    private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
+    /** {@code connections.max.idle.ms} */
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+    private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
 
     /** {@code key.serde} */
     public static final String KEY_SERDE_CLASS_CONFIG = "key.serde";
     private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface.";
 
-    /** {@code value.serde} */
-    public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
-    private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface.";
-
-    /**{@code user.endpoint} */
-    public static final String APPLICATION_SERVER_CONFIG = "application.server";
-    private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application";
-
-    /** {@code metrics.sample.window.ms} */
-    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+    /** {@code metadata.max.age.ms} */
+    public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+    private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
 
     /** {@code metrics.num.samples} */
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
@@ -170,48 +141,89 @@ public class StreamsConfig extends AbstractConfig {
     /** {@code metric.reporters} */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
-    /** {@code bootstrap.servers} */
-    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+    /** {@code metrics.sample.window.ms} */
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
-    /** {@code client.id} */
-    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+    /** {@code num.standby.replicas} */
+    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
+    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
 
-    /** {@code rocksdb.config.setter} */
-    public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
-    private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface";
+    /** {@code num.stream.threads} */
+    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
+    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
 
-    /** {@code windowstore.changelog.additional.retention.ms} */
-    public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
-    private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";
+    /** {@code partition.grouper} */
+    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
+    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
+
+    /** {@code poll.ms} */
+    public static final String POLL_MS_CONFIG = "poll.ms";
+    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
 
     /** {@code cache.max.bytes.buffering} */
-    public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
-    private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
+    public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
+    private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once</code>.";
 
-    public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
-    private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
-    public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+    /** {@code receive.buffer.bytes} */
+    public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
+    private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC;
 
-    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
-    private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
+    /** {@code reconnect.backoff.ms} */
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+    private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
 
+    /** {@code replication.factor} */
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
+
+    /** {@code request.timeout.ms} */
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+
+    /** {@code retry.backoff.ms} */
     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
     private static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
 
-    public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
-    private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
+    /** {@code rocksdb.config.setter} */
+    public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
+    private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface";
 
-    public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
-    private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
+    /** {@code security.protocol} */
+    public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+    private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
+    public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
 
+    /** {@code send.buffer.bytes} */
     public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
     private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC;
 
-    public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
-    private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC;
+    /** {@code state.cleanup.delay} */
+    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
+    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed";
 
-    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
-    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+    /** {@code state.dir} */
+    public static final String STATE_DIR_CONFIG = "state.dir";
+    private static final String STATE_DIR_DOC = "Directory location for state store.";
+
+    /** {@code timestamp.extractor} */
+    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
+    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+
+    /** {@code value.serde} */
+    public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
+    private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface.";
+
+    /** {@code windowstore.changelog.additional.retention.ms} */
+    public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
+    private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";
+
+    /**
+     * {@code zookeeper.connect}
+     * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored.
+     */
+    @Deprecated
+    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
+    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
 
     static {
         CONFIG = new ConfigDef()
@@ -382,7 +394,13 @@ public class StreamsConfig extends AbstractConfig {
                     40 * 1000,
                     atLeast(0),
                     ConfigDef.Importance.MEDIUM,
-                    REQUEST_TIMEOUT_MS_DOC);
+                    REQUEST_TIMEOUT_MS_DOC)
+            .define(PROCESSING_GUARANTEE_CONFIG,
+                    ConfigDef.Type.STRING,
+                    "at_least_once",
+                    in("at_least_once", "exactly_once"),
+                    Importance.MEDIUM,
+                    PROCESSING_GUARANTEE_DOC);
     }
 
     // this is the list of configs for underlying clients

http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index c4a09de..0122ea0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -146,4 +146,10 @@ public class RecordCollectorImpl implements RecordCollector {
     public Map<TopicPartition, Long> offsets() {
         return this.offsets;
     }
+
+    // for testing only
+    Producer<byte[], byte[]> producer() {
+        return producer;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 092d6e7..7524087 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
@@ -54,18 +55,18 @@ public class StreamTask extends AbstractTask implements Punctuator {
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
     private final PunctuationQueue punctuationQueue;
-    private final Map<TopicPartition, RecordQueue> partitionQueues;
 
     private final Map<TopicPartition, Long> consumedOffsets;
     private final RecordCollector recordCollector;
     private final int maxBufferedSize;
+    private final boolean exactlyOnceEnabled;
 
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
     private boolean requiresPoll = true;
     private final Time time;
     private final TaskMetrics metrics;
-    private Runnable commitDelegate = new Runnable() {
+    private final Runnable commitDelegate = new Runnable() {
         @Override
         public void run() {
             // 1) flush local state
@@ -94,54 +95,55 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param stateDirectory        the {@link StateDirectory} created by the thread
      * @param recordCollector       the instance of {@link RecordCollector} used to produce records
      */
-    public StreamTask(TaskId id,
-                      String applicationId,
-                      Collection<TopicPartition> partitions,
-                      ProcessorTopology topology,
-                      Consumer<byte[], byte[]> consumer,
+    public StreamTask(final TaskId id,
+                      final String applicationId,
+                      final Collection<TopicPartition> partitions,
+                      final ProcessorTopology topology,
+                      final Consumer<byte[], byte[]> consumer,
                       final ChangelogReader changelogReader,
-                      StreamsConfig config,
-                      StreamsMetrics metrics,
-                      StateDirectory stateDirectory,
-                      ThreadCache cache,
-                      Time time,
+                      final StreamsConfig config,
+                      final StreamsMetrics metrics,
+                      final StateDirectory stateDirectory,
+                      final ThreadCache cache,
+                      final Time time,
                       final RecordCollector recordCollector) {
         super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache);
-        this.punctuationQueue = new PunctuationQueue();
-        this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        punctuationQueue = new PunctuationQueue();
+        maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once");
         this.metrics = new TaskMetrics(metrics);
 
         // create queues for each assigned partition and associate them
         // to corresponding source nodes in the processor topology
-        partitionQueues = new HashMap<>();
+        final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
 
-        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+        final TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
 
-        for (TopicPartition partition : partitions) {
-            SourceNode source = topology.source(partition.topic());
-            RecordQueue queue = createRecordQueue(partition, source, timestampExtractor);
+        for (final TopicPartition partition : partitions) {
+            final SourceNode source = topology.source(partition.topic());
+            final RecordQueue queue = createRecordQueue(partition, source, timestampExtractor);
             partitionQueues.put(partition, queue);
         }
 
-        this.logPrefix = String.format("task [%s]", id);
+        logPrefix = String.format("task [%s]", id);
 
-        this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
+        partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
 
         // initialize the consumed offset cache
-        this.consumedOffsets = new HashMap<>();
+        consumedOffsets = new HashMap<>();
 
         // create the record recordCollector that maintains the produced offsets
         this.recordCollector = recordCollector;
 
         // initialize the topology with its own context
-        this.processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
+        processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
         this.time = time;
         // initialize the state stores
         log.info("{} Initializing state stores", logPrefix);
         initializeStateStores();
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
         initTopology();
-        this.processorContext.initialized();
+        processorContext.initialized();
     }
 
     /**
@@ -150,7 +152,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      *
      * @param partition the partition
      * @param records  the records
-     * @returns the number of added records
+     * @return the number of added records
      */
     @SuppressWarnings("unchecked")
     public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
@@ -389,6 +391,18 @@ public class StreamTask extends AbstractTask implements Punctuator {
         metrics.removeAllSensors();
     }
 
+    void closeProducer() {
+        if (exactlyOnceEnabled) {
+            try {
+                recordCollector.close();
+            } catch (final Throwable e) {
+                log.error("{} Failed to close producer: ", logPrefix, e);
+            }
+        } else {
+            throw new IllegalStateException("Tasks should only close producers if exactly-once semantics is enabled.");
+        }
+    }
+
     @Override
     protected Map<TopicPartition, Long> recordCollectorOffsets() {
         return recordCollector.offsets();
@@ -439,4 +453,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         recordCollector.flush();
     }
 
+    // for testing only
+    Producer<byte[], byte[]> producer() {
+        return ((RecordCollectorImpl) recordCollector).producer();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 03a9789..cd78a85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -184,7 +184,8 @@ public class StreamThread extends Thread {
 
     protected final StreamsConfig config;
     protected final TopologyBuilder builder;
-    protected final Producer<byte[], byte[]> producer;
+    protected Producer<byte[], byte[]> threadProducer;
+    protected final KafkaClientSupplier clientSupplier;
     protected final Consumer<byte[], byte[]> consumer;
     protected final Consumer<byte[], byte[]> restoreConsumer;
 
@@ -207,12 +208,13 @@ public class StreamThread extends Thread {
     // TODO: this is not private only for tests, should be better refactored
     final StateDirectory stateDirectory;
     private String originalReset;
-    private StreamPartitionAssignor partitionAssignor = null;
+    private StreamPartitionAssignor partitionAssignor;
     private boolean cleanRun = false;
     private long timerStartedMs;
     private long lastCleanMs;
     private long lastCommitMs;
     private Throwable rebalanceException = null;
+    private boolean exactlyOnceEnabled;
 
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
@@ -233,41 +235,40 @@ public class StreamThread extends Thread {
         return threadClientId;
     }
 
-    public StreamThread(TopologyBuilder builder,
-                        StreamsConfig config,
-                        KafkaClientSupplier clientSupplier,
-                        String applicationId,
-                        String clientId,
-                        UUID processId,
-                        Metrics metrics,
-                        Time time,
-                        StreamsMetadataState streamsMetadataState,
+    public StreamThread(final TopologyBuilder builder,
+                        final StreamsConfig config,
+                        final KafkaClientSupplier clientSupplier,
+                        final String applicationId,
+                        final String clientId,
+                        final UUID processId,
+                        final Metrics metrics,
+                        final Time time,
+                        final StreamsMetadataState streamsMetadataState,
                         final long cacheSizeBytes) {
         super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
         this.applicationId = applicationId;
         this.config = config;
         this.builder = builder;
-        this.sourceTopicPattern = builder.sourceTopicPattern();
+        this.clientSupplier = clientSupplier;
+        sourceTopicPattern = builder.sourceTopicPattern();
         this.clientId = clientId;
         this.processId = processId;
-        this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+        partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
         this.streamsMetadataState = streamsMetadataState;
         threadClientId = getName();
-        this.streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId,
+        streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId,
             Collections.singletonMap("client-id", threadClientId));
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
             log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadClientId);
         }
-        this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics);
+        cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
+        exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once");
 
-        this.logPrefix = String.format("stream-thread [%s]", threadClientId);
+        logPrefix = String.format("stream-thread [%s]", threadClientId);
 
-        // set the producer and consumer clients
-        log.info("{} Creating producer client", logPrefix);
-        this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
+        // set the consumer clients
         log.info("{} Creating consumer client", logPrefix);
-
-        Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId);
+        final Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId);
 
         if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
             originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
@@ -275,35 +276,35 @@ public class StreamThread extends Thread {
             consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
         }
 
-        this.consumer = clientSupplier.getConsumer(consumerConfigs);
+        consumer = clientSupplier.getConsumer(consumerConfigs);
         log.info("{} Creating restore consumer client", logPrefix);
-        this.restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId));
+        restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId));
         // initialize the task list
         // activeTasks needs to be concurrent as it can be accessed
         // by QueryableState
-        this.activeTasks = new ConcurrentHashMap<>();
-        this.standbyTasks = new HashMap<>();
-        this.activeTasksByPartition = new HashMap<>();
-        this.standbyTasksByPartition = new HashMap<>();
-        this.prevActiveTasks = new HashSet<>();
-        this.suspendedTasks = new HashMap<>();
-        this.suspendedStandbyTasks = new HashMap<>();
+        activeTasks = new ConcurrentHashMap<>();
+        standbyTasks = new HashMap<>();
+        activeTasksByPartition = new HashMap<>();
+        standbyTasksByPartition = new HashMap<>();
+        prevActiveTasks = new HashSet<>();
+        suspendedTasks = new HashMap<>();
+        suspendedStandbyTasks = new HashMap<>();
 
         // standby ktables
-        this.standbyRecords = new HashMap<>();
+        standbyRecords = new HashMap<>();
 
-        this.stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
+        stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
         final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
-        this.rebalanceTimeoutMs =  (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
-        this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
-        this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
-        this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
+        rebalanceTimeoutMs =  (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
+        pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+        commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+        cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
 
         this.time = time;
-        this.timerStartedMs = time.milliseconds();
-        this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
-        this.lastCommitMs = timerStartedMs;
-        this.rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
+        timerStartedMs = time.milliseconds();
+        lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
+        lastCommitMs = timerStartedMs;
+        rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
         setState(State.RUNNING);
 
     }
@@ -356,24 +357,26 @@ public class StreamThread extends Thread {
         shutdownTasksAndState();
 
         // close all embedded clients
-        try {
-            producer.close();
-        } catch (Throwable e) {
-            log.error("{} Failed to close producer: ", logPrefix, e);
+        if (threadProducer != null) {
+            try {
+                threadProducer.close();
+            } catch (Throwable e) {
+                log.error("{} Failed to close producer: ", logPrefix, e);
+            }
         }
         try {
             consumer.close();
-        } catch (Throwable e) {
+        } catch (final Throwable e) {
             log.error("{} Failed to close consumer: ", logPrefix, e);
         }
         try {
             restoreConsumer.close();
-        } catch (Throwable e) {
+        } catch (final Throwable e) {
             log.error("{} Failed to close restore consumer: ", logPrefix, e);
         }
         try {
             partitionAssignor.close();
-        } catch (Throwable e) {
+        } catch (final Throwable e) {
             log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e);
         }
 
@@ -420,6 +423,10 @@ public class StreamThread extends Thread {
         if (cleanRun && firstException.get() == null) {
             firstException.set(commitOffsets());
         }
+        // Close all task producers
+        if (exactlyOnceEnabled) {
+            closeAllProducers();
+        }
         // remove the changelog partitions from restore consumer
         unAssignChangeLogPartitions();
     }
@@ -479,6 +486,21 @@ public class StreamThread extends Thread {
         return firstException;
     }
 
+    private void closeAllProducers() {
+        for (final StreamTask task : activeTasks.values()) {
+            log.info("{} Closing the producer of task {}", StreamThread.this.logPrefix, task.id());
+            try {
+                task.closeProducer();
+            } catch (RuntimeException e) {
+                log.error("{} Failed while executing {} {} due to {}: ",
+                    StreamThread.this.logPrefix,
+                    task.getClass().getSimpleName(),
+                    task.id(),
+                    e);
+            }
+        }
+    }
+
     private List<AbstractTask> activeAndStandbytasks() {
         final List<AbstractTask> tasks = new ArrayList<AbstractTask>(activeTasks.values());
         tasks.addAll(standbyTasks.values());
@@ -941,18 +963,42 @@ public class StreamThread extends Thread {
         }
     }
 
-    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
-        log.debug("{} Creating new active task {} with assigned partitions {}", logPrefix, id, partitions);
+    protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+        log.info("{} Creating active task {} with assigned partitions [{}]", logPrefix, id, partitions);
 
         streamsMetrics.taskCreatedSensor.record();
 
-        final ProcessorTopology topology = builder.build(id.topicGroupId);
-        final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString());
-        try {
-            return new StreamTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory, cache, time, recordCollector);
-        } finally {
-            log.info("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions);
+        return new StreamTask(
+            id,
+            applicationId,
+            partitions,
+            builder.build(id.topicGroupId),
+            consumer,
+            storeChangelogReader,
+            config,
+            streamsMetrics,
+            stateDirectory,
+            cache,
+            time,
+            createRecordCollector(id));
+    }
+
+    private RecordCollector createRecordCollector(final TaskId id) {
+        final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
+
+        final Producer<byte[], byte[]> producer;
+        if (exactlyOnceEnabled) {
+            log.info("{} Creating producer client for task {}", logPrefix, id);
+            producer = clientSupplier.getProducer(producerConfigs);
+        } else {
+            if (threadProducer == null) {
+                log.info("{} Creating shared producer client", logPrefix);
+                threadProducer = clientSupplier.getProducer(producerConfigs);
+            }
+            producer = threadProducer;
         }
+
+        return new RecordCollectorImpl(producer, id.toString());
     }
 
     private void addStreamTasks(Collection<TopicPartition> assignment, final long start) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 6256434..28dc7ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -51,8 +51,8 @@ import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
-import org.junit.Test;
 import org.junit.Before;
+import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
@@ -553,6 +553,28 @@ public class StreamTaskTest {
         assertTrue(source2.closed);
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowWhenClosingProducerForNonEoS() {
+        task.closeProducer();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCloseProducerWhenExactlyOneEnabled() {
+        final Map properties = this.config.values();
+        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+        final StreamsConfig config = new StreamsConfig(properties);
+
+        final MockedProducer producer = new MockedProducer(null);
+
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
+            changelogReader, config, streamsMetrics, stateDirectory, null, time, new RecordCollectorImpl(producer, "taskId"));
+
+        task.closeProducer();
+
+        assertTrue(producer.closed);
+    }
+
     @SuppressWarnings("unchecked")
     private StreamTask createTaskThatThrowsExceptionOnClose() {
         final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer, intDeserializer) {
@@ -579,4 +601,25 @@ public class StreamTaskTest {
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }
+
+    private final static class MockedProducer extends MockProducer {
+        private final AtomicBoolean flushed;
+        boolean closed = false;
+
+        MockedProducer(final AtomicBoolean flushed) {
+            super(false, null, null);
+            this.flushed = flushed;
+        }
+
+        @Override
+        public void flush() {
+            flushed.set(true);
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e36a236..b726884 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -16,23 +16,11 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -53,24 +41,40 @@ import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
-import org.junit.Before;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.EMPTY_SET;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import static org.junit.Assert.assertThat;
-
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
@@ -102,7 +106,7 @@ public class StreamThreadTest {
         new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(),
+    private Cluster metadata = new Cluster("cluster", Collections.singleton(Node.noNode()), infos, Collections.<String>emptySet(),
             Collections.<String>emptySet());
 
     private final PartitionAssignor.Subscription subscription =
@@ -144,20 +148,20 @@ public class StreamThreadTest {
     }
 
     private static class TestStreamTask extends StreamTask {
-        public boolean committed = false;
+        boolean committed = false;
         private boolean closed;
         private boolean closedStateManager;
 
-        public TestStreamTask(TaskId id,
-                              String applicationId,
-                              Collection<TopicPartition> partitions,
-                              ProcessorTopology topology,
-                              Consumer<byte[], byte[]> consumer,
-                              Producer<byte[], byte[]> producer,
-                              Consumer<byte[], byte[]> restoreConsumer,
-                              StreamsConfig config,
-                              StreamsMetrics metrics,
-                              StateDirectory stateDirectory) {
+        TestStreamTask(TaskId id,
+                       String applicationId,
+                       Collection<TopicPartition> partitions,
+                       ProcessorTopology topology,
+                       Consumer<byte[], byte[]> consumer,
+                       Producer<byte[], byte[]> producer,
+                       Consumer<byte[], byte[]> restoreConsumer,
+                       StreamsConfig config,
+                       StreamsMetrics metrics,
+                       StateDirectory stateDirectory) {
             super(id, applicationId, partitions, topology, consumer, new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000), config, metrics,
                   stateDirectory, null, new MockTime(), new RecordCollectorImpl(producer, id.toString()));
         }
@@ -207,7 +211,7 @@ public class StreamThreadTest {
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
 
-        MockClientSupplier mockClientSupplier = new MockClientSupplier();
+        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
         StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
 
             @Override
@@ -215,7 +219,7 @@ public class StreamThreadTest {
 
                 ProcessorTopology topology = builder.build(id.topicGroupId);
                 return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
-                    producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+                    mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
             }
         };
 
@@ -234,7 +238,7 @@ public class StreamThreadTest {
 
         revokedPartitions = Collections.emptyList();
         assignedPartitions = Collections.singletonList(t1p1);
-        expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
+        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
 
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
         assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED);
@@ -253,7 +257,7 @@ public class StreamThreadTest {
 
         revokedPartitions = assignedPartitions;
         assignedPartitions = Collections.singletonList(t1p2);
-        expectedGroup2 = new HashSet<>(Arrays.asList(t1p2));
+        expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
 
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
         assertFalse(thread.tasks().containsKey(task1));
@@ -294,7 +298,7 @@ public class StreamThreadTest {
 
         revokedPartitions = assignedPartitions;
         assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
-        expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
+        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
         expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
 
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
@@ -308,7 +312,7 @@ public class StreamThreadTest {
 
         revokedPartitions = assignedPartitions;
         assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
-        expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
+        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
         expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
 
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
@@ -333,10 +337,11 @@ public class StreamThreadTest {
             (thread.state() == StreamThread.State.NOT_RUNNING));
     }
 
-    final static String TOPIC = "topic";
-    final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0));
-    final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1));
+    private final static String TOPIC = "topic";
+    private final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0));
+    private final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1));
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testHandingOverTaskFromOneToAnotherThread() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -365,21 +370,17 @@ public class StreamThreadTest {
         thread2.partitionAssignor(new MockStreamsPartitionAssignor(thread2Assignment));
 
         // revoke (to get threads in correct state)
-        thread1.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
-        thread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
+        thread1.rebalanceListener.onPartitionsRevoked(EMPTY_SET);
+        thread2.rebalanceListener.onPartitionsRevoked(EMPTY_SET);
 
         // assign
         thread1.rebalanceListener.onPartitionsAssigned(task0Assignment);
         thread2.rebalanceListener.onPartitionsAssigned(task1Assignment);
 
         final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>();
-        for (TaskId tid : thread1.tasks().keySet()) {
-            originalTaskAssignmentThread1.add(tid);
-        }
+        originalTaskAssignmentThread1.addAll(thread1.tasks().keySet());
         final Set<TaskId> originalTaskAssignmentThread2 = new HashSet<>();
-        for (TaskId tid : thread2.tasks().keySet()) {
-            originalTaskAssignmentThread2.add(tid);
-        }
+        originalTaskAssignmentThread2.addAll(thread2.tasks().keySet());
 
         // revoke (task will be suspended)
         thread1.rebalanceListener.onPartitionsRevoked(task0Assignment);
@@ -415,7 +416,7 @@ public class StreamThreadTest {
 
         private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
 
-        public MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
+        MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
             this.activeTaskAssignment = activeTaskAssignment;
         }
 
@@ -490,7 +491,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
 
-            MockClientSupplier mockClientSupplier = new MockClientSupplier();
+            final MockClientSupplier mockClientSupplier = new MockClientSupplier();
             StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                                    0) {
 
@@ -503,7 +504,7 @@ public class StreamThreadTest {
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
                     return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
-                        producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+                        mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 }
             };
 
@@ -620,7 +621,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
 
-            MockClientSupplier mockClientSupplier = new MockClientSupplier();
+            final MockClientSupplier mockClientSupplier = new MockClientSupplier();
             StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                                    0) {
 
@@ -633,7 +634,7 @@ public class StreamThreadTest {
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
                     return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
-                        producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+                        mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 }
             };
 
@@ -691,19 +692,154 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void testInjectClients() {
-        TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
-        StreamsConfig config = new StreamsConfig(configProps());
-        MockClientSupplier clientSupplier = new MockClientSupplier();
-        StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                               clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                               0);
-        assertSame(clientSupplier.producer, thread.producer);
+    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierWhenEosDisabled() {
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
+        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(assignment));
+
+        thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
+
+        assertEquals(1, clientSupplier.numberOfCreatedProducers);
+        assertSame(clientSupplier.producer, thread.threadProducer);
+        for (final StreamTask task : thread.tasks().values()) {
+            assertSame(clientSupplier.producer, task.producer());
+        }
         assertSame(clientSupplier.consumer, thread.consumer);
         assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
     }
 
     @Test
+    public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() {
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+        final Properties properties = configProps();
+        properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+        final StreamsConfig config = new StreamsConfig(properties);
+        final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier();
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
+        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
+        assignment.put(new TaskId(0, 2), Collections.singleton(new TopicPartition("someTopic", 2)));
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(assignment));
+
+        final Set<TopicPartition> assignedPartitions = new HashSet<>();
+        Collections.addAll(assignedPartitions, new TopicPartition("someTopic", 0), new TopicPartition("someTopic", 2));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        assertNull(thread.threadProducer);
+        assertEquals(thread.tasks().size(), clientSupplier.numberOfCreatedProducers);
+        Iterator it = clientSupplier.producers.iterator();
+        for (final StreamTask task : thread.tasks().values()) {
+            assertSame(it.next(), task.producer());
+        }
+        assertSame(clientSupplier.consumer, thread.consumer);
+        assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
+    }
+
+    private static class EoSMockClientSupplier extends MockClientSupplier {
+        final List<Producer> producers = new LinkedList<>();
+
+        @Override
+        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
+            final Producer<byte[], byte[]> producer = new MockedProducer<>();
+            producers.add(producer);
+            ++numberOfCreatedProducers;
+            return producer;
+        }
+    }
+
+    @Test
+    public void shouldCloseAllTaskProducers() {
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+        final Properties properties = configProps();
+        properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+        final StreamsConfig config = new StreamsConfig(properties);
+        final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier();
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
+        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(assignment));
+
+        thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
+
+        thread.close();
+        thread.run();
+
+        for (final StreamTask task : thread.tasks().values()) {
+            assertTrue(((MockedProducer) task.producer()).closed);
+        }
+    }
+
+    @Test
+    public void shouldCloseThreadProducer() {
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier();
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
+        assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(assignment));
+
+        thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
+
+        thread.close();
+        thread.run();
+
+        assertTrue(((MockedProducer) thread.threadProducer).closed);
+    }
+
+    @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId(applicationId)
@@ -848,17 +984,17 @@ public class StreamThreadTest {
         builder.setApplicationId(applicationId);
         builder.stream(Pattern.compile("t.*")).to("out");
         final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
 
         final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>();
 
-        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+        final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId,
                                                      clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 final ProcessorTopology topology = builder.build(id.topicGroupId);
                 final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer,
-                    producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+                    mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 createdTasks.put(partitions, task);
                 return task;
             }
@@ -1119,10 +1255,10 @@ public class StreamThreadTest {
         partitionAssignor.onAssignment(assignments.get("client"));
     }
 
-    public static class StateListenerStub implements StreamThread.StateListener {
-        public int numChanges = 0;
-        public StreamThread.State oldState = null;
-        public StreamThread.State newState = null;
+    private static class StateListenerStub implements StreamThread.StateListener {
+        int numChanges = 0;
+        StreamThread.State oldState = null;
+        StreamThread.State newState = null;
 
         @Override
         public void onChange(final StreamThread thread, final StreamThread.State newState, final StreamThread.State oldState) {
@@ -1136,4 +1272,20 @@ public class StreamThreadTest {
             this.newState = newState;
         }
     }
+
+    private final static class MockedProducer<K, V> extends MockProducer<K, V> {
+        boolean closed = false;
+
+        MockedProducer() {
+            super(false, null, null);
+        }
+
+        @Override
+        public void close() {
+            if (closed) {
+                throw new IllegalStateException("MockedProducer is already closed.");
+            }
+            closed = true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index c867ad7..4afd442 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.test;
 
-import java.util.Map;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -26,9 +24,13 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.streams.KafkaClientSupplier;
 
+import java.util.Map;
+
 public class MockClientSupplier implements KafkaClientSupplier {
     private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
 
+    public int numberOfCreatedProducers = 0;
+
     public final MockProducer<byte[], byte[]> producer =
             new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
     public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -36,6 +38,7 @@ public class MockClientSupplier implements KafkaClientSupplier {
 
     @Override
     public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
+        ++numberOfCreatedProducers;
         return producer;
     }
 
@@ -48,4 +51,5 @@ public class MockClientSupplier implements KafkaClientSupplier {
     public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
         return restoreConsumer;
     }
+
 }


Mime
View raw message