kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4561; Ordering of operations in StreamThread.shutdownTasksAndState may void at-least-once guarantees
Date Tue, 03 Jan 2017 16:34:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7411ed89b -> cea2a669b


KAFKA-4561; Ordering of operations in StreamThread.shutdownTasksAndState may void at-least-once guarantees

In `shutdownTasksAndState` and `suspendTasksAndState` we commit offsets BEFORE we flush any state. This is wrong as if an exception occurs during a flush, we may violate the at-least-once guarantees, that is we would have committed some offsets but NOT sent the processed data on to other Sinks.
Also during suspend and shutdown, we should try and complete all tasks even when exceptions occur. We should just keep track of the exception and rethrow it at the end if necessary. This helps with ensuring that StateStores etc are closed.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2281 from dguy/kafka-4561


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cea2a669
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cea2a669
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cea2a669

Branch: refs/heads/trunk
Commit: cea2a669b34b2f94d445aace2b6e95e70af7e2f6
Parents: 7411ed8
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Jan 3 08:34:15 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 3 08:34:15 2017 -0800

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |   5 +-
 .../internals/ProcessorStateManager.java        |  48 ++--
 .../processor/internals/RecordCollector.java    |   5 +
 .../internals/RecordCollectorImpl.java          |   3 +-
 .../processor/internals/StandbyContextImpl.java |   7 +
 .../streams/processor/internals/StreamTask.java |  20 +-
 .../processor/internals/StreamThread.java       | 129 +++++------
 .../integration/utils/IntegrationTestUtils.java |   2 +
 .../internals/ProcessorStateManagerTest.java    |  24 ++
 .../processor/internals/StandbyTaskTest.java    |   4 +-
 .../processor/internals/StreamTaskTest.java     |  34 ++-
 .../processor/internals/StreamThreadTest.java   | 220 ++++++++++++++++++-
 .../StreamThreadStateStoreProviderTest.java     |   7 +-
 .../kafka/test/MockStateStoreSupplier.java      |  17 +-
 .../apache/kafka/test/NoOpRecordCollector.java  |  16 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  18 +-
 16 files changed, 431 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 127a64e..622426d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -122,11 +122,12 @@ public abstract class AbstractTask {
 
     /**
      * @throws ProcessorStateException if there is an error while closing the state manager
+     * @param writeCheckpoint boolean indicating if a checkpoint file should be written
      */
-    void closeStateManager() {
+    void closeStateManager(final boolean writeCheckpoint) {
         log.trace("task [{}] Closing", id());
         try {
-            stateMgr.close(recordCollectorOffsets());
+            stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
         } catch (IOException e) {
             throw new ProcessorStateException("Error while closing the state manager", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 821b260..c81df6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -357,33 +357,35 @@ public class ProcessorStateManager {
                     }
                 }
 
-                Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
-                for (String storeName : stores.keySet()) {
-                    TopicPartition part;
-                    if (loggingEnabled.contains(storeName))
-                        part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName));
-                    else
-                        part = new TopicPartition(storeName, getPartition(storeName));
-
-                    // only checkpoint the offset to the offsets file if it is persistent;
-                    if (stores.get(storeName).persistent()) {
-                        Long offset = ackedOffsets.get(part);
-
-                        if (offset != null) {
-                            // store the last offset + 1 (the log position after restoration)
-                            checkpointOffsets.put(part, offset + 1);
-                        } else {
-                            // if no record was produced. we need to check the restored offset.
-                            offset = restoredOffsets.get(part);
-                            if (offset != null)
-                                checkpointOffsets.put(part, offset);
+                if (ackedOffsets != null) {
+                    Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
+                    for (String storeName : stores.keySet()) {
+                        TopicPartition part;
+                        if (loggingEnabled.contains(storeName))
+                            part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName));
+                        else
+                            part = new TopicPartition(storeName, getPartition(storeName));
+
+                        // only checkpoint the offset to the offsets file if it is persistent;
+                        if (stores.get(storeName).persistent()) {
+                            Long offset = ackedOffsets.get(part);
+
+                            if (offset != null) {
+                                // store the last offset + 1 (the log position after restoration)
+                                checkpointOffsets.put(part, offset + 1);
+                            } else {
+                                // if no record was produced. we need to check the restored offset.
+                                offset = restoredOffsets.get(part);
+                                if (offset != null)
+                                    checkpointOffsets.put(part, offset);
+                            }
                         }
                     }
+                    // write the checkpoint file before closing, to indicate clean shutdown
+                    OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
+                    checkpoint.write(checkpointOffsets);
                 }
 
-                // write the checkpoint file before closing, to indicate clean shutdown
-                OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
-                checkpoint.write(checkpointOffsets);
             }
         } finally {
             // release the state directory directoryLock

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 6d7d561..8b0dcdf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -17,9 +17,12 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
+import java.util.Map;
+
 public interface RecordCollector {
     <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer);
 
@@ -30,6 +33,8 @@ public interface RecordCollector {
 
     void close();
 
+    Map<TopicPartition, Long> offsets();
+
     /**
      * A supplier of a {@link RecordCollectorImpl} instance.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 31596cc..d733e66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -131,7 +131,8 @@ public class RecordCollectorImpl implements RecordCollector {
      *
      * @return the map from TopicPartition to offset
      */
-    Map<TopicPartition, Long> offsets() {
+    @Override
+    public Map<TopicPartition, Long> offsets() {
         return this.offsets;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 9ce6595..a660b15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
@@ -28,6 +29,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.io.File;
+import java.util.Collections;
 import java.util.Map;
 
 public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier {
@@ -52,6 +54,11 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
         public void close() {
 
         }
+
+        @Override
+        public Map<TopicPartition, Long> offsets() {
+            return Collections.emptyMap();
+        }
     };
 
     private final TaskId id;

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ae374ce..9369c01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
@@ -55,7 +54,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
     private final Map<TopicPartition, RecordQueue> partitionQueues;
 
     private final Map<TopicPartition, Long> consumedOffsets;
-    private final RecordCollectorImpl recordCollector;
+    private final RecordCollector recordCollector;
     private final int maxBufferedSize;
 
     private boolean commitRequested = false;
@@ -70,23 +69,23 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
-     * @param producer              the instance of {@link Producer}
      * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
      * @param config                the {@link StreamsConfig} specified by the user
      * @param metrics               the {@link StreamsMetrics} created by the thread
      * @param stateDirectory        the {@link StateDirectory} created by the thread
+     * @param recordCollector       the instance of {@link RecordCollector} used to produce records
      */
     public StreamTask(TaskId id,
                       String applicationId,
                       Collection<TopicPartition> partitions,
                       ProcessorTopology topology,
                       Consumer<byte[], byte[]> consumer,
-                      Producer<byte[], byte[]> producer,
                       Consumer<byte[], byte[]> restoreConsumer,
                       StreamsConfig config,
                       StreamsMetrics metrics,
                       StateDirectory stateDirectory,
-                      ThreadCache cache) {
+                      ThreadCache cache,
+                      final RecordCollector recordCollector) {
         super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
@@ -110,10 +109,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
         this.consumedOffsets = new HashMap<>();
 
         // create the record recordCollector that maintains the produced offsets
-        this.recordCollector = new RecordCollectorImpl(producer, id().toString());
+        this.recordCollector = recordCollector;
 
         // initialize the topology with its own context
-        this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
+        this.processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
 
         // initialize the state stores
         log.info("{} Initializing state stores", logPrefix);
@@ -361,7 +360,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
         log.debug("{} Closing processor topology", logPrefix);
 
         this.partitionGroup.close();
-        this.consumedOffsets.clear();
         closeTopology();
     }
 
@@ -387,5 +385,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return super.toString();
     }
 
-
+    @Override
+    public void flushState() {
+        super.flushState();
+        recordCollector.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f09bf54..626de7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -64,6 +64,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
@@ -252,14 +253,14 @@ public class StreamThread extends Thread {
             try {
                 if (state == State.PENDING_SHUTDOWN) {
                     log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.",
-                        StreamThread.this.getName(), assignment);
+                             StreamThread.this.getName(), assignment);
                 }
                 log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.",
-                        StreamThread.this.getName(), assignment);
+                         StreamThread.this.getName(), assignment);
                 setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
                 // suspend active tasks
-                suspendTasksAndState(true);
+                suspendTasksAndState();
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
@@ -385,7 +386,7 @@ public class StreamThread extends Thread {
 
     private void shutdown() {
         log.info("{} Shutting down", logPrefix);
-        shutdownTasksAndState(false);
+        shutdownTasksAndState();
 
         // close all embedded clients
         try {
@@ -407,7 +408,9 @@ public class StreamThread extends Thread {
         // TODO remove this
         // hotfix to improve ZK behavior als long as KAFKA-4060 is not fixed (c.f. KAFKA-4369)
         // when removing this, make StreamPartitionAssignor#internalTopicManager "private" again
-        partitionAssignor.internalTopicManager.zkClient.close();
+        if (partitionAssignor != null && partitionAssignor.internalTopicManager != null) {
+            partitionAssignor.internalTopicManager.zkClient.close();
+        }
 
         // remove all tasks
         removeStreamTasks();
@@ -417,120 +420,122 @@ public class StreamThread extends Thread {
         setState(State.NOT_RUNNING);
     }
 
-    private void unAssignChangeLogPartitions(final boolean rethrowExceptions) {
+    private RuntimeException unAssignChangeLogPartitions() {
         try {
             // un-assign the change log partitions
             restoreConsumer.assign(Collections.<TopicPartition>emptyList());
-        } catch (Exception e) {
+        } catch (RuntimeException e) {
             log.error("{} Failed to un-assign change log partitions: ", logPrefix, e);
-            if (rethrowExceptions) {
-                throw e;
-            }
+            return e;
         }
+        return null;
     }
 
 
-    private void shutdownTasksAndState(final boolean rethrowExceptions) {
+    @SuppressWarnings("ThrowableNotThrown")
+    private void shutdownTasksAndState() {
         log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix,
             activeTasks.keySet(), standbyTasks.keySet());
 
-        // only commit under clean exit
-        if (cleanRun) {
-            // Commit first as there may be cached records that have not been flushed yet.
-            commitOffsets(rethrowExceptions);
-        }
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         // Close all processors in topology order
-        closeAllTasks();
+        firstException.compareAndSet(null, closeAllTasks());
         // flush state
-        flushAllState(rethrowExceptions);
-        // flush out any extra data sent during close
-        producer.flush();
-        // Close all task state managers
-        closeAllStateManagers(rethrowExceptions);
+        firstException.compareAndSet(null, flushAllState());
+        // Close all task state managers. Don't need to set exception as all
+        // state would have been flushed above
+        closeAllStateManagers(firstException.get() == null);
+        // only commit under clean exit
+        if (cleanRun && firstException.get() == null) {
+            firstException.set(commitOffsets());
+        }
         // remove the changelog partitions from restore consumer
-        unAssignChangeLogPartitions(rethrowExceptions);
+        unAssignChangeLogPartitions();
     }
 
 
     /**
      * Similar to shutdownTasksAndState, however does not close the task managers,
      * in the hope that soon the tasks will be assigned again
-     * @param rethrowExceptions
      */
-    private void suspendTasksAndState(final boolean rethrowExceptions) {
+    private void suspendTasksAndState()  {
         log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix,
             activeTasks.keySet(), standbyTasks.keySet());
-
-        // Commit first as there may be cached records that have not been flushed yet.
-        commitOffsets(rethrowExceptions);
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         // Close all topology nodes
-        closeAllTasksTopologies();
+        firstException.compareAndSet(null, closeAllTasksTopologies());
         // flush state
-        flushAllState(rethrowExceptions);
-        // flush out any extra data sent during close
-        producer.flush();
+        firstException.compareAndSet(null, flushAllState());
+        // only commit after all state has been flushed and there hasn't been an exception
+        if (firstException.get() == null) {
+            firstException.set(commitOffsets());
+        }
         // remove the changelog partitions from restore consumer
-        unAssignChangeLogPartitions(rethrowExceptions);
+        firstException.compareAndSet(null, unAssignChangeLogPartitions());
 
         updateSuspendedTasks();
 
+        if (firstException.get() != null) {
+            throw new StreamsException(logPrefix + " failed to suspend stream tasks", firstException.get());
+        }
     }
 
     interface AbstractTaskAction {
         void apply(final AbstractTask task);
     }
 
-    private void performOnAllTasks(final AbstractTaskAction action,
-                                   final String exceptionMessage,
-                                   final boolean throwExceptions) {
+    private RuntimeException performOnAllTasks(final AbstractTaskAction action,
+                                   final String exceptionMessage) {
+        RuntimeException firstException = null;
         final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(activeTasks.values());
         allTasks.addAll(standbyTasks.values());
         for (final AbstractTask task : allTasks) {
             try {
                 action.apply(task);
-            } catch (KafkaException e) {
-                log.error("{} Failed while executing {} {} duet to {}: ",
+            } catch (RuntimeException t) {
+                log.error("{} Failed while executing {} {} due to {}: ",
                         StreamThread.this.logPrefix,
                         task.getClass().getSimpleName(),
                         task.id(),
                         exceptionMessage,
-                        e);
-                if (throwExceptions) {
-                    throw e;
+                        t);
+                if (firstException == null) {
+                    firstException = t;
                 }
             }
         }
+        return firstException;
     }
 
-    private void closeAllStateManagers(final boolean throwExceptions) {
-        performOnAllTasks(new AbstractTaskAction() {
+    private Throwable closeAllStateManagers(final boolean writeCheckpoint) {
+        return performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, task.id());
-                task.closeStateManager();
+                task.closeStateManager(writeCheckpoint);
             }
-        }, "close state manager", throwExceptions);
+        }, "close state manager");
     }
 
-    private void commitOffsets(final boolean throwExceptions) {
+    private RuntimeException commitOffsets() {
         // Exceptions should not prevent this call from going through all shutdown steps
-        performOnAllTasks(new AbstractTaskAction() {
+        return performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, task.id());
                 task.commitOffsets();
             }
-        }, "commit consumer offsets", throwExceptions);
+        }, "commit consumer offsets");
     }
 
-    private void flushAllState(final boolean throwExceptions) {
-        performOnAllTasks(new AbstractTaskAction() {
+    private RuntimeException flushAllState() {
+        return performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, task.id());
                 task.flushState();
             }
-        }, "flush state", throwExceptions);
+        }, "flush state");
     }
 
     /**
@@ -788,9 +793,9 @@ public class StreamThread extends Thread {
 
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(id.topicGroupId);
-
-        return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors, stateDirectory, cache);
+        final ProcessorTopology topology = builder.build(id.topicGroupId);
+        final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString());
+        return new StreamTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors, stateDirectory, cache, recordCollector);
     }
 
     private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
@@ -824,7 +829,7 @@ public class StreamThread extends Thread {
                 log.debug("{} closing suspended non-assigned task", logPrefix);
                 try {
                     task.close();
-                    task.closeStateManager();
+                    task.closeStateManager(true);
                 } catch (Exception e) {
                     log.error("{} Failed to remove suspended task {}", logPrefix, next.getKey(), e);
                 } finally {
@@ -845,7 +850,7 @@ public class StreamThread extends Thread {
                 final StandbyTask task = suspendedTask.getValue();
                 try {
                     task.close();
-                    task.closeStateManager();
+                    task.closeStateManager(true);
                 } catch (Exception e) {
                     log.error("{} Failed to remove suspended task standby {}", logPrefix, suspendedTask.getKey(), e);
                 } finally {
@@ -997,26 +1002,26 @@ public class StreamThread extends Thread {
         standbyRecords.clear();
     }
 
-    private void closeAllTasks() {
-        performOnAllTasks(new AbstractTaskAction() {
+    private RuntimeException closeAllTasks() {
+        return performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Closing a task {}", StreamThread.this.logPrefix, task.id());
                 task.close();
                 sensors.taskDestructionSensor.record();
             }
-        }, "close", false);
+        }, "close");
     }
 
-    private void closeAllTasksTopologies() {
-        performOnAllTasks(new AbstractTaskAction() {
+    private RuntimeException closeAllTasksTopologies() {
+        return performOnAllTasks(new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix, task.id());
                 task.closeTopology();
                 sensors.taskDestructionSensor.record();
             }
-        }, "close", false);
+        }, "close");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index fe20225..aa358ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -102,7 +102,9 @@ public class IntegrationTestUtils {
                 consumedValues.add(new KeyValue<>(record.key(), record.value()));
             }
         }
+
         consumer.close();
+
         return consumedValues;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 9198fa9..7023712 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -474,4 +474,28 @@ public class ProcessorStateManagerTest {
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
 
+    @Test
+    public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception {
+        final TaskId taskId = new TaskId(0, 1);
+        final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        // write an empty checkpoint file
+        final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
+        oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
+
+        final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+
+        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
+
+
+        final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
+
+        restoreConsumer.reset();
+        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.close(null);
+        assertFalse(checkpointFile.exists());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b28b8d2..10a86fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -202,7 +202,7 @@ public class StandbyTaskTest {
         assertEquals(Collections.emptyList(), store1.keys);
         assertEquals(Utils.mkList(1, 2, 3), store2.keys);
 
-        task.closeStateManager();
+        task.closeStateManager(true);
 
         File taskDir = stateDirectory.directoryForTask(taskId);
         OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
@@ -295,7 +295,7 @@ public class StandbyTaskTest {
         remaining = task.update(ktable, remaining);
         assertNull(remaining);
 
-        task.closeStateManager();
+        task.closeStateManager(true);
 
         File taskDir = stateDirectory.directoryForTask(taskId);
         OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 15f93d9..3d50007 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -53,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -88,6 +90,7 @@ public class StreamTaskTest {
             Collections.<StateStore, ProcessorNode>emptyMap());
     private File baseDir;
     private StateDirectory stateDirectory;
+    private RecordCollectorImpl recordCollector;
 
     private StreamsConfig createConfig(final File baseDir) throws Exception {
         return new StreamsConfig(new Properties() {
@@ -127,7 +130,8 @@ public class StreamTaskTest {
     @Test
     public void testProcessOrder() throws Exception {
         StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null);
+        recordCollector = new RecordCollectorImpl(producer, "taskId");
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -174,7 +178,7 @@ public class StreamTaskTest {
     @Test
     public void testPauseResume() throws Exception {
         StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null);
+        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -233,7 +237,7 @@ public class StreamTaskTest {
     @Test
     public void testMaybePunctuate() throws Exception {
         StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null);
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -312,7 +316,7 @@ public class StreamTaskTest {
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
                                                                  Collections.<StateStore, ProcessorNode>emptyMap());
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0));
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0), recordCollector);
         final int offset = 20;
         streamTask.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -360,7 +364,7 @@ public class StreamTaskTest {
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
                                                                  Collections.<StateStore, ProcessorNode>emptyMap());
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0));
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0), recordCollector);
 
         try {
             streamTask.punctuate(punctuator, 1);
@@ -372,6 +376,26 @@ public class StreamTaskTest {
 
     }
 
+    @Test
+    public void shouldFlushRecordCollectorOnFlushState() throws Exception {
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                                                                 Collections.<String, SourceNode>emptyMap(),
+                                                                 Collections.<String, SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList(),
+                                                                 Collections.<String, String>emptyMap(),
+                                                                 Collections.<StateStore, ProcessorNode>emptyMap());
+        final AtomicBoolean flushed = new AtomicBoolean(false);
+        final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
+            @Override
+            public void flush() {
+                flushed.set(true);
+            }
+        };
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "appId", partitions, topology, consumer, restoreStateConsumer, createConfig(baseDir), null, stateDirectory, new ThreadCache(0), recordCollector);
+        streamTask.flushState();
+        assertTrue(flushed.get());
+    }
+
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e8c10e4..442fc3a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -27,16 +27,19 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
@@ -64,6 +67,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class StreamThreadTest {
 
@@ -145,7 +149,17 @@ public class StreamThreadTest {
                               Consumer<byte[], byte[]> restoreConsumer,
                               StreamsConfig config,
                               StateDirectory stateDirectory) {
-            super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null, stateDirectory, null);
+            super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, new StreamsMetrics() {
+                @Override
+                public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) {
+                    return null;
+                }
+
+                @Override
+                public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
+
+                }
+            }, stateDirectory, null, new RecordCollectorImpl(producer, id.toString()));
         }
 
         @Override
@@ -172,8 +186,8 @@ public class StreamThreadTest {
         }
 
         @Override
-        void closeStateManager() {
-            super.closeStateManager();
+        void closeStateManager(boolean writeCheckpoint) {
+            super.closeStateManager(writeCheckpoint);
             this.closedStateManager = true;
         }
     }
@@ -831,6 +845,206 @@ public class StreamThreadTest {
         assertThat(createdTasks.get(task00Partitions).id(), is(taskId));
     }
 
+    @Test
+    public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId(applicationId);
+        builder.stream("t1").groupByKey();
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0),
+                                                                 applicationId,
+                                                                 Utils.mkSet(new TopicPartition("t1", 0)),
+                                                                 builder.build(0),
+                                                                 clientSupplier.consumer,
+                                                                 clientSupplier.producer,
+                                                                 clientSupplier.restoreConsumer,
+                                                                 config,
+                                                                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
+            @Override
+            public void close() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        final StreamsConfig config1 = new StreamsConfig(configProps());
+
+        final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+                return testStreamTask;
+            }
+        };
+
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
+
+
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+
+        thread.start();
+        thread.close();
+        thread.join();
+        assertFalse("task shouldn't have been committed as there was an exception during shutdown", testStreamTask.committed);
+
+
+    }
+
+    @Test
+    public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId(applicationId);
+        final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo", false);
+        builder.stream("t1").groupByKey().count(new MockStateStoreSupplier(stateStore));
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0),
+                                                                 applicationId,
+                                                                 Utils.mkSet(new TopicPartition("t1", 0)),
+                                                                 builder.build(0),
+                                                                 clientSupplier.consumer,
+                                                                 clientSupplier.producer,
+                                                                 clientSupplier.restoreConsumer,
+                                                                 config,
+                                                                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
+            @Override
+            public void flushState() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+
+        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+                return testStreamTask;
+            }
+        };
+
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
+
+
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+        // store should have been opened
+        assertTrue(stateStore.isOpen());
+
+        thread.start();
+        thread.close();
+        thread.join();
+        assertFalse("task shouldn't have been committed as there was an exception during shutdown", testStreamTask.committed);
+        // store should be closed even if we had an exception
+        assertFalse(stateStore.isOpen());
+    }
+
+    @Test
+    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringCloseTopologyWhenSuspendingState() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId(applicationId);
+        builder.stream("t1").groupByKey();
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0),
+                                                                 applicationId,
+                                                                 Utils.mkSet(new TopicPartition("t1", 0)),
+                                                                 builder.build(0),
+                                                                 clientSupplier.consumer,
+                                                                 clientSupplier.producer,
+                                                                 clientSupplier.restoreConsumer,
+                                                                 config,
+                                                                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
+            @Override
+            public void closeTopology() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        final StreamsConfig config1 = new StreamsConfig(configProps());
+
+        final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+                return testStreamTask;
+            }
+        };
+
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
+
+
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+        try {
+            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+            fail("should have thrown exception");
+        } catch (Exception e) {
+            // expected
+        }
+        assertFalse(testStreamTask.committed);
+    }
+
+    @Test
+    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId(applicationId);
+        builder.stream("t1").groupByKey();
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0),
+                                                                 applicationId,
+                                                                 Utils.mkSet(new TopicPartition("t1", 0)),
+                                                                 builder.build(0),
+                                                                 clientSupplier.consumer,
+                                                                 clientSupplier.producer,
+                                                                 clientSupplier.restoreConsumer,
+                                                                 config,
+                                                                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
+            @Override
+            public void flushState() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        final StreamsConfig config1 = new StreamsConfig(configProps());
+
+        final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+                return testStreamTask;
+            }
+        };
+
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
+
+
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+        try {
+            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+            fail("should have thrown exception");
+        } catch (Exception e) {
+            // expected
+        }
+        assertFalse(testStreamTask.committed);
+
+    }
+
+
     private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 9758d89..fe53ec0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -183,11 +184,10 @@ public class StreamThreadStateStoreProviderTest {
                                          final ProcessorTopology topology,
                                          final TaskId taskId) {
         return new StreamTask(taskId, applicationId, Collections
-            .singletonList(new TopicPartition("topic", taskId.partition)), topology,
+                .singletonList(new TopicPartition("topic", taskId.partition)), topology,
                               clientSupplier.consumer,
-                              clientSupplier.producer,
                               clientSupplier.restoreConsumer,
-                              streamsConfig, new TheStreamMetrics(), stateDirectory, null) {
+                              streamsConfig, new TheStreamMetrics(), stateDirectory, null, new NoOpRecordCollector()) {
             @Override
             protected void initializeOffsetLimits() {
 
@@ -237,4 +237,5 @@ public class StreamThreadStateStoreProviderTest {
 
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 3532623..d1fe213 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -29,9 +29,10 @@ import java.util.Collections;
 import java.util.Map;
 
 public class MockStateStoreSupplier implements StateStoreSupplier {
-    private final String name;
-    private final boolean persistent;
-    private final boolean loggingEnabled;
+    private String name;
+    private boolean persistent;
+    private boolean loggingEnabled;
+    private MockStateStore stateStore;
 
     public MockStateStoreSupplier(String name, boolean persistent) {
         this(name, persistent, true);
@@ -43,6 +44,10 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
         this.loggingEnabled = loggingEnabled;
     }
 
+    public MockStateStoreSupplier(final MockStateStore stateStore) {
+        this.stateStore = stateStore;
+    }
+
     @Override
     public String name() {
         return name;
@@ -50,6 +55,9 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
 
     @Override
     public StateStore get() {
+        if (stateStore != null) {
+            return stateStore;
+        }
         if (loggingEnabled) {
             return new MockStateStore(name, persistent).enableLogging();
         } else {
@@ -74,7 +82,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
         public boolean loggingEnabled = false;
         public boolean initialized = false;
         public boolean flushed = false;
-        public boolean closed = false;
+        public boolean closed = true;
         public final ArrayList<Integer> keys = new ArrayList<>();
 
         public MockStateStore(String name, boolean persistent) {
@@ -96,6 +104,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
         public void init(ProcessorContext context, StateStore root) {
             context.register(root, loggingEnabled, stateRestoreCallback);
             initialized = true;
+            closed = false;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index d4368d3..c32ed08 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -17,14 +17,15 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
 
-public class NoOpRecordCollector extends RecordCollectorImpl {
-    public NoOpRecordCollector() {
-        super(null, "NoOpRecordCollector");
-    }
+import java.util.Collections;
+import java.util.Map;
+
+public class NoOpRecordCollector implements RecordCollector {
 
     @Override
     public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
@@ -45,4 +46,9 @@ public class NoOpRecordCollector extends RecordCollectorImpl {
     public void close() {
         //no-op
     }
+
+    @Override
+    public Map<TopicPartition, Long> offsets() {
+        return Collections.emptyMap();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea2a669/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 83a9092..7dad408 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -172,14 +173,13 @@ public class ProcessorTopologyTestDriver {
         consumer.assign(offsetsByTopicPartition.keySet());
 
         task = new StreamTask(id,
-            applicationId,
-            partitionsByTopic.values(),
-            topology,
-            consumer,
-            producer,
-            restoreStateConsumer,
-            config,
-            new StreamsMetrics() {
+                              applicationId,
+                              partitionsByTopic.values(),
+                              topology,
+                              consumer,
+                              restoreStateConsumer,
+                              config,
+                              new StreamsMetrics() {
                 @Override
                 public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
                     return null;
@@ -189,7 +189,7 @@ public class ProcessorTopologyTestDriver {
                 public void recordLatency(Sensor sensor, long startNs, long endNs) {
                     // do nothing
                 }
-            }, new StateDirectory(applicationId, TestUtils.tempDirectory().getPath()), new ThreadCache(1024 * 1024));
+            }, new StateDirectory(applicationId, TestUtils.tempDirectory().getPath()), new ThreadCache(1024 * 1024), new RecordCollectorImpl(producer, "id"));
     }
 
     /**


Mime
View raw message