Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4E390200C4D for ; Wed, 22 Mar 2017 04:29:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4C990160B90; Wed, 22 Mar 2017 03:29:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6AD79160B81 for ; Wed, 22 Mar 2017 04:29:47 +0100 (CET) Received: (qmail 17585 invoked by uid 500); 22 Mar 2017 03:29:46 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 17574 invoked by uid 99); 22 Mar 2017 03:29:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Mar 2017 03:29:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id EB673C002B for ; Wed, 22 Mar 2017 03:29:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -98.549 X-Spam-Level: X-Spam-Status: No, score=-98.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id eF5RxLRRJSmi for ; Wed, 22 Mar 2017 03:29:44 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 4B97A60CEB for ; Wed, 22 Mar 2017 03:29:43 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 4619CE073C for ; Wed, 22 Mar 2017 03:29:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 8D00E254D6 for ; Wed, 22 Mar 2017 03:29:41 +0000 (UTC) Date: Wed, 22 Mar 2017 03:29:41 +0000 (UTC) From: "Guozhang Wang (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-4890) State directory being deleted when another thread holds the lock MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 22 Mar 2017 03:29:48 -0000 [ https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935729#comment-15935729 ] Guozhang Wang commented on KAFKA-4890: -------------------------------------- [~yunolgun] Thanks for the updates. I tried to reproduce the above sequential unit test on a MacOS but it seems `FileLock` does prevent threads within the same JVM to simultaneously have locks on the same file, instead a `OverlappingFileLockException` will be thrown. Googling more about `FileLocks` reveals that `FileLock` impl is OS-system dependent and also Java-version dependent. That is to say, it might not be 100% safe to rely on `FileLock` on all operating systems (btw which Java version are you using?) Given this I will leave this JIRA open as is until we have more clues on how to reproduce this issue on a specific environment. > State directory being deleted when another thread holds the lock > ---------------------------------------------------------------- > > Key: KAFKA-4890 > URL: https://issues.apache.org/jira/browse/KAFKA-4890 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Damian Guy > Attachments: logs2.tar.gz, logs3.tar.gz, logs.tar.gz > > > Looks like a state directory is being cleaned up when another thread already has the lock: > {code} > 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager - task [0_6] Registering state store perGameScoreStore to its state manager > 2017-03-12 20:40:21 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_6 for task 0_6 > 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group fireflyProd failed on partition assignment > org.apache.kafka.streams.errors.ProcessorStateException: Error while executing put key \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore > at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248) > at org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65) > at org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156) > at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230) > at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) > at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152) > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39) > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141) > at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.rocksdb.RocksDBException: ` > at org.rocksdb.RocksDB.put(Native Method) > at org.rocksdb.RocksDB.put(RocksDB.java:488) > at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246) > ... 27 common frames omitted > {code} > Also > {code} > 2017-03-12 20:46:58 [StreamThread-4] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_2 for task 0_2 > ... > 2017-03-12 20:47:02 [StreamThread-2] ERROR o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-2] Failed to commit StandbyTask 0_2 state: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_2] Failed to flush state store lifetimeScoreStore > at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325) > at org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94) > at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777) > at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:767) > at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661) > at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store lifetimeScoreStore > at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:346) > at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:337) > at org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80) > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186) > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112) > at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323) > ... 6 common frames omitted > Caused by: org.rocksdb.RocksDBException: a > at org.rocksdb.RocksDB.flush(Native Method) > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) > {code} > Operating System info > Distributor ID: Debian > Description: Debian GNU/Linux 8.7 (jessie) > Release: 8.7 > Codename: jessie > uname: 3.16.0-4-amd64 > FWIW - i don't see anything obvious and I can't reproduce it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)