kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "George Bloggs (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean
Date Thu, 15 Mar 2018 09:29:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400125#comment-16400125

George Bloggs commented on KAFKA-6647:

As mentioned earlier, in our code I have implemented the following and call this directly
before kafkaStreams.start(); It works and clears the directory which highlights the issue
is within the lock functionality :

 private void ourCleanUp() {
        final File baseDir = new File(streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG));
        final File stateDir = new File(baseDir, streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
        try {
        } catch (IOException e) {
            LOGGER.error("Arrgghhhhh!! ourCleanUp failed!", e);

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> ------------------------------------------------------------------------
>                 Key: KAFKA-6647
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6647
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.1
>         Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>            Reporter: George Bloggs
>            Priority: Minor
>              Labels: streams
> When calling kafkaStreams.cleanUp() before starting a stream the StateDirectory.cleanRemovedTasks()
method contains this check:
> {code:java}
> ... Line 240
>                   if (lock(id, 0)) {
>                         long now = time.milliseconds();
>                         long lastModifiedMs = taskDir.lastModified();
>                         if (now > lastModifiedMs + cleanupDelayMs) {
>                             log.info("{} Deleting obsolete state directory {} for task
{} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs,
>                             Utils.delete(taskDir);
>                         }
>                     }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that subsequently
is going to be deleted. If the .lock file already exists from a previous run the attempt to
delete the .lock file fails with AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will then attempt
to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail java.nio.file.DirectoryNotEmptyException
as the failed attempt to delete the .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory
      : stream-thread [restartedMain] Failed to lock the state directory due to an unexpected
> This seems to then cause issues using streams from a topic to an inMemory store.

This message was sent by Atlassian JIRA

View raw message