kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6499) Avoid creating dummy checkpoint files with no state stores
Date Thu, 01 Feb 2018 18:12:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349027#comment-16349027
] 

ASF GitHub Bot commented on KAFKA-6499:
---------------------------------------

guozhangwang closed pull request #4492: KAFKA-6499: Do not write offset checkpoint file with
empty offset map
URL: https://github.com/apache/kafka/pull/4492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index d38776239a9..b270e03f2e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -65,7 +65,7 @@ public void reinitializeStateStoresForPartitions(final Logger log,
         try {
             checkpoint.write(checkpointableOffsets);
         } catch (final IOException fatalException) {
-            log.error("Failed to update checkpoint file for global stores.", fatalException);
+            log.error("Failed to write offset checkpoint file to {} while re-initializing
{}: {}", checkpoint, stateStores, fatalException);
             throw new StreamsException("Failed to reinitialize global store.", fatalException);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2d4ee8fe613..56e6bed0850 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -339,7 +339,7 @@ public void checkpoint(final Map<TopicPartition, Long> offsets)
{
             try {
                 checkpoint.write(checkpointableOffsets);
             } catch (IOException e) {
-                log.warn("Failed to write offsets checkpoint for global globalStores", e);
+                log.warn("Failed to write offset checkpoint file to {} for global stores:
{}", checkpoint, e);
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1ee0e146a09..e7a23bd4b5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -294,7 +294,6 @@ public void close(final Map<TopicPartition, Long> ackedOffsets)
throws Processor
     // write the checkpoint
     @Override
     public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
-        log.trace("Writing checkpoint: {}", ackedOffsets);
         checkpointableOffsets.putAll(changelogReader.restoredOffsets());
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
@@ -311,14 +310,16 @@ public void checkpoint(final Map<TopicPartition, Long> ackedOffsets)
{
                 }
             }
         }
-        // write the checkpoint file before closing, to indicate clean shutdown
+        // write the checkpoint file before closing
+        if (checkpoint == null) {
+            checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
+        }
+
+        log.trace("Writing checkpoint: {}", checkpointableOffsets);
         try {
-            if (checkpoint == null) {
-                checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
-            }
             checkpoint.write(checkpointableOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write checkpoint file to {}:", new File(baseDir, CHECKPOINT_FILE_NAME),
e);
+            log.warn("Failed to write offset checkpoint file to {}: {}", checkpoint, e);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 8c147373ccf..9f0e1f8c50c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -66,6 +66,11 @@ public OffsetCheckpoint(final File file) {
      * @throws IOException if any file operation fails with an IO exception
      */
     public void write(final Map<TopicPartition, Long> offsets) throws IOException {
+        // if there is no offsets, skip writing the file to save disk IOs
+        if (offsets.isEmpty()) {
+            return;
+        }
+
         synchronized (lock) {
             // write to temp file and then swap with the existing file
             final File temp = new File(file.getAbsolutePath() + ".tmp");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ab9abc30e5b..31f07cc402d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -309,8 +309,8 @@ public void testFlushAndClose() throws IOException {
             false,
             logContext);
         try {
-            // make sure the checkpoint file isn't deleted
-            assertTrue(checkpointFile.exists());
+            // make sure the checkpoint file is not written yet
+            assertFalse(checkpointFile.exists());
 
             stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
@@ -630,7 +630,7 @@ public void close() {
 
     @Test
     public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException {
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName,
1), 123L));
         assertTrue(checkpointFile.exists());
 
         ProcessorStateManager stateManager = null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index 3b78d0567d8..54cd3df8623 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -18,6 +18,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,8 +35,8 @@
 
     @Test
     public void testReadWrite() throws IOException {
-        File f = TestUtils.tempFile();
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+        final File f = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
 
         try {
             Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -56,4 +57,20 @@ public void testReadWrite() throws IOException {
             checkpoint.delete();
         }
     }
+
+    @Test
+    public void shouldNotWriteCheckpointWhenNoOffsets() throws IOException {
+        // we do not need to worry about file name uniqueness since this file should not
be created
+        final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), "kafka.tmp");
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+
+        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+
+        assertFalse(f.exists());
+
+        assertEquals(Collections.<TopicPartition, Long>emptyMap(), checkpoint.read());
+
+        // deleting a non-exist checkpoint file should be fine
+        checkpoint.delete();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Avoid creating dummy checkpoint files with no state stores
> ----------------------------------------------------------
>
>                 Key: KAFKA-6499
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6499
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>
> Today, for a streams task that contains no state stores, its processor state manager
would still write a dummy checkpoint file that contains some characters (version, size). This
introduces unnecessary disk IOs and should be avoidable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message