kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steven Schlansker (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists
Date Mon, 09 Apr 2018 19:24:00 GMT
Steven Schlansker created KAFKA-6767:
----------------------------------------

             Summary: OffsetCheckpoint write assumes parent directory exists
                 Key: KAFKA-6767
                 URL: https://issues.apache.org/jira/browse/KAFKA-6767
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.1.0
            Reporter: Steven Schlansker


We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an instance dies
it is created from scratch, rather than reusing the existing RocksDB.)

We routinely see:
{code:java}
2018-04-09T19:14:35.004Z WARN <> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1]
o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset checkpoint file to
/mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
java.io.FileNotFoundException: /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such
file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
Inspecting the state store directory, I can indeed see that {{chat/0_11}} does not exist (although
many other partitions do).

 

Looking at the OffsetCheckpoint write method, it seems to try to open a new checkpoint file
without first ensuring that the parent directory exists.

 
{code:java}
    public void write(final Map<TopicPartition, Long> offsets) throws IOException
{
        // if there is no offsets, skip writing the file to save disk IOs
        if (offsets.isEmpty()) {
            return;
        }

        synchronized (lock) {
            // write to temp file and then swap with the existing file
            final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
 

Either the OffsetCheckpoint class should initialize the directories if needed, or some precondition
of it being called should ensure that is the case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message