kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: make sure to go through all shutdown steps
Date Mon, 22 Feb 2016 21:16:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 46e4d4013 -> ff7b0f5b4


HOTFIX: make sure to go through all shutdown steps

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #928 from ymatsuda/shutdown


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff7b0f5b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff7b0f5b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff7b0f5b

Branch: refs/heads/trunk
Commit: ff7b0f5b467bdf553584fb253b00f460dfbe8943
Parents: 46e4d40
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Mon Feb 22 13:16:06 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Feb 22 13:16:06 2016 -0800

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java        | 70 ++++++++++----------
 .../streams/processor/internals/StreamTask.java |  4 +-
 .../processor/internals/StreamThread.java       | 60 +++++++++--------
 3 files changed, 71 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index c3bd82a..d449d04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -298,45 +298,47 @@ public class ProcessorStateManager {
     }
 
     public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
-        if (!stores.isEmpty()) {
-            log.debug("Closing stores.");
-            for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
-                log.debug("Closing storage engine {}", entry.getKey());
-                entry.getValue().flush();
-                entry.getValue().close();
-            }
+        try {
+            if (!stores.isEmpty()) {
+                log.debug("Closing stores.");
+                for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
+                    log.debug("Closing storage engine {}", entry.getKey());
+                    entry.getValue().flush();
+                    entry.getValue().close();
+                }
 
-            Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
-            for (String storeName : stores.keySet()) {
-                TopicPartition part;
-                if (loggingEnabled.contains(storeName))
-                    part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName));
-                else
-                    part = new TopicPartition(storeName, getPartition(storeName));
-
-                // only checkpoint the offset to the offsets file if it is persistent;
-                if (stores.get(storeName).persistent()) {
-                    Long offset = ackedOffsets.get(part);
-
-                    if (offset != null) {
-                        // store the last offset + 1 (the log position after restoration)
-                        checkpointOffsets.put(part, offset + 1);
-                    } else {
-                        // if no record was produced. we need to check the restored offset.
-                        offset = restoredOffsets.get(part);
-                        if (offset != null)
-                            checkpointOffsets.put(part, offset);
+                Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
+                for (String storeName : stores.keySet()) {
+                    TopicPartition part;
+                    if (loggingEnabled.contains(storeName))
+                        part = new TopicPartition(storeChangelogTopic(jobId, storeName),
getPartition(storeName));
+                    else
+                        part = new TopicPartition(storeName, getPartition(storeName));
+
+                    // only checkpoint the offset to the offsets file if it is persistent;
+                    if (stores.get(storeName).persistent()) {
+                        Long offset = ackedOffsets.get(part);
+
+                        if (offset != null) {
+                            // store the last offset + 1 (the log position after restoration)
+                            checkpointOffsets.put(part, offset + 1);
+                        } else {
+                            // if no record was produced. we need to check the restored offset.
+                            offset = restoredOffsets.get(part);
+                            if (offset != null)
+                                checkpointOffsets.put(part, offset);
+                        }
                     }
                 }
-            }
 
-            // write the checkpoint file before closing, to indicate clean shutdown
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
-            checkpoint.write(checkpointOffsets);
+                // write the checkpoint file before closing, to indicate clean shutdown
+                OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir,
CHECKPOINT_FILE_NAME));
+                checkpoint.write(checkpointOffsets);
+            }
+        } finally {
+            // release the state directory directoryLock
+            directoryLock.release();
         }
-
-        // release the state directory directoryLock
-        directoryLock.release();
     }
 
     private int getPartition(String topic) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ec3d011..4d66324 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -300,10 +300,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
             }
         }
 
+        super.close();
+
         if (exception != null)
             throw exception;
-
-        super.close();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3fc9407..70e24d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -138,14 +138,15 @@ public class StreamThread extends Thread {
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
             try {
                 commitAll();
-                // TODO: right now upon partition revocation, we always remove all the tasks;
-                // this behavior can be optimized to only remove affected tasks in the future
-                removeStreamTasks();
-                removeStandbyTasks();
                 lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are
assigned
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
+            } finally {
+                // TODO: right now upon partition revocation, we always remove all the tasks;
+                // this behavior can be optimized to only remove affected tasks in the future
+                removeStreamTasks();
+                removeStandbyTasks();
             }
         }
     };
@@ -273,6 +274,8 @@ public class StreamThread extends Thread {
     private void shutdown() {
         log.info("Shutting down stream thread [" + this.getName() + "]");
 
+        // Exceptions should not prevent this call from going through all shutdown steps
+
         try {
             commitAll();
         } catch (Throwable e) {
@@ -299,13 +302,8 @@ public class StreamThread extends Thread {
             log.error("Failed to close restore consumer in thread [" + this.getName() + "]:
", e);
         }
 
-        // Exceptions should not prevent this call from going through all shutdown steps
-        try {
-            removeStreamTasks();
-            removeStandbyTasks();
-        } catch (Throwable e) {
-            // already logged in removeStreamTasks() and removeStandbyTasks()
-        }
+        removeStreamTasks();
+        removeStandbyTasks();
 
         log.info("Stream thread shutdown complete [" + this.getName() + "]");
     }
@@ -627,15 +625,19 @@ public class StreamThread extends Thread {
     }
 
     private void removeStreamTasks() {
-        for (StreamTask task : activeTasks.values()) {
-            closeOne(task);
-        }
+        try {
+            for (StreamTask task : activeTasks.values()) {
+                closeOne(task);
+            }
+            prevTasks.clear();
+            prevTasks.addAll(activeTasks.keySet());
 
-        prevTasks.clear();
-        prevTasks.addAll(activeTasks.keySet());
+            activeTasks.clear();
+            activeTasksByPartition.clear();
 
-        activeTasks.clear();
-        activeTasksByPartition.clear();
+        } catch (Exception e) {
+            log.error("Failed to remove stream tasks in thread [" + this.getName() + "]:
", e);
+        }
     }
 
     private void closeOne(AbstractTask task) {
@@ -644,7 +646,6 @@ public class StreamThread extends Thread {
             task.close();
         } catch (StreamsException e) {
             log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id()
+ " in thread [" + this.getName() + "]: ", e);
-            throw e;
         }
         sensors.taskDestructionSensor.record();
     }
@@ -701,15 +702,20 @@ public class StreamThread extends Thread {
 
 
     private void removeStandbyTasks() {
-        for (StandbyTask task : standbyTasks.values()) {
-            closeOne(task);
-        }
-        // un-assign the change log partitions
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        try {
+            for (StandbyTask task : standbyTasks.values()) {
+                closeOne(task);
+            }
+            standbyTasks.clear();
+            standbyTasksByPartition.clear();
+            standbyRecords.clear();
 
-        standbyTasks.clear();
-        standbyTasksByPartition.clear();
-        standbyRecords.clear();
+            // un-assign the change log partitions
+            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+
+        } catch (Exception e) {
+            log.error("Failed to remove standby tasks in thread [" + this.getName() + "]:
", e);
+        }
     }
 
     private void ensureCopartitioning(Collection<Set<String>> copartitionGroups)
{


Mime
View raw message