Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90BD9185E8 for ; Fri, 1 Jan 2016 15:18:40 +0000 (UTC) Received: (qmail 79444 invoked by uid 500); 1 Jan 2016 15:18:40 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 79336 invoked by uid 500); 1 Jan 2016 15:18:40 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 79309 invoked by uid 99); 1 Jan 2016 15:18:40 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jan 2016 15:18:40 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id F31682C1F5A for ; Fri, 1 Jan 2016 15:18:39 +0000 (UTC) Date: Fri, 1 Jan 2016 15:18:39 +0000 (UTC) From: "jin xing (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (KAFKA-2944) NullPointerException in KafkaConfigStorage when config storage starts right before shutdown request MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/KAFKA-2944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15076312#comment-15076312 ] jin xing edited comment on KAFKA-2944 at 1/1/16 3:18 PM: --------------------------------------------------------- Cannot reproduce this; believe it is transient failure; KafkaConfigStorage code as below: private final Callback> consumedCallback = new Callback>() { public void onCompletion(Throwable error, ConsumerRecord record) { ... else if (record.key().startsWith(TASK_PREFIX)) { Map> deferred = deferredTaskUpdates.get(taskId.connector()); if (deferred == null) { deferred = new HashMap<>(); deferredTaskUpdates.put(taskId.connector(), deferred); } deferred.put(taskId, (Map) newTaskConfig); } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) { Map> deferred = deferredTaskUpdates.get(connectorName); int newTaskCount = intValue(((Map) value.value()).get("tasks")); Map> updatedConfigIdsByConnector = taskIdsByConnector(deferred); Set taskIdSet = updatedConfigIdsByConnector.get(connectorName); if (!completeTaskIdSet(taskIdSet, newTaskCount)) //NullPointerException comes out from here { .... } } Since method "halt()" in DistributedHerder has not executed yet, believe that it is not the issue of shutdown; In method of KafkaConfigStorage::putTaskConfigs, if failed to send messages with TASK but succeeded sending message with COMMIT_TASKS_PREFIX, the deferredTaskUpdates will not have corresponding key of connector; So it make sense to call a 'flush' after sending easy message of connector or task configuration to KafkaBasedLog; was (Author: jinxing6042@126.com): Cannot reproduce this; believe it is transient failure; KafkaConfigStorage code as below: private final Callback> consumedCallback = new Callback>() { public void onCompletion(Throwable error, ConsumerRecord record) { ... else if (record.key().startsWith(TASK_PREFIX)) { Map> deferred = deferredTaskUpdates.get(taskId.connector()); if (deferred == null) { deferred = new HashMap<>(); deferredTaskUpdates.put(taskId.connector(), deferred); } deferred.put(taskId, (Map) newTaskConfig); } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) { Map> deferred = deferredTaskUpdates.get(connectorName); int newTaskCount = intValue(((Map) value.value()).get("tasks")); Map> updatedConfigIdsByConnector = taskIdsByConnector(deferred); Set taskIdSet = updatedConfigIdsByConnector.get(connectorName); if (!completeTaskIdSet(taskIdSet, newTaskCount)) { //NullPointerException comes out from here .... } } Since method "halt()" in DistributedHerder has not executed yet, believe that it is not the issue of shutdown; In method of KafkaConfigStorage::putTaskConfigs, if failed to send messages with TASK but succeeded sending message with COMMIT_TASKS_PREFIX, the deferredTaskUpdates will not have corresponding key of connector; So it make sense to call a 'flush' after sending easy message of connector or task configuration to KafkaBasedLog; > NullPointerException in KafkaConfigStorage when config storage starts right before shutdown request > --------------------------------------------------------------------------------------------------- > > Key: KAFKA-2944 > URL: https://issues.apache.org/jira/browse/KAFKA-2944 > Project: Kafka > Issue Type: Bug > Components: copycat > Affects Versions: 0.9.0.0 > Reporter: Ewen Cheslack-Postava > Assignee: Ewen Cheslack-Postava > > Relevant log where you can see a config update starting, then the request to shutdown happens and we end up with a NullPointerException: > {quote} > [2015-12-03 09:12:55,712] DEBUG Change in connector task count from 2 to 3, writing updated task configurations (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2015-12-03 09:12:56,224] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect) > [2015-12-03 09:12:56,224] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer) > [2015-12-03 09:12:56,227] INFO Stopped ServerConnector@10cb550e{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector) > [2015-12-03 09:12:56,234] INFO Stopped o.e.j.s.ServletContextHandler@3f8a24d5{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler) > [2015-12-03 09:12:56,235] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer) > [2015-12-03 09:12:56,235] INFO Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2015-12-03 09:12:58,209] ERROR Unexpected exception in KafkaBasedLog's work thread (org.apache.kafka.connect.util.KafkaBasedLog) > java.lang.NullPointerException > at org.apache.kafka.connect.storage.KafkaConfigStorage.completeTaskIdSet(KafkaConfigStorage.java:558) > at org.apache.kafka.connect.storage.KafkaConfigStorage.access$1200(KafkaConfigStorage.java:143) > at org.apache.kafka.connect.storage.KafkaConfigStorage$1.onCompletion(KafkaConfigStorage.java:476) > at org.apache.kafka.connect.storage.KafkaConfigStorage$1.onCompletion(KafkaConfigStorage.java:372) > at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:235) > at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:275) > at org.apache.kafka.connect.util.KafkaBasedLog.access$300(KafkaBasedLog.java:70) > at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:307) > [2015-12-03 09:13:26,704] ERROR Failed to write root configuration to Kafka: (org.apache.kafka.connect.storage.KafkaConfigStorage) > java.util.concurrent.TimeoutException: Timed out waiting for future > at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:74) > at org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:352) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:737) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:677) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:673) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) > at java.lang.Thread.run(Thread.java:745) > [2015-12-03 09:13:26,704] ERROR Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > org.apache.kafka.connect.errors.ConnectException: Error writing root configuration to Kafka > at org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:355) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:737) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:677) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:673) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184) > at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.TimeoutException: Timed out waiting for future > at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:74) > at org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:352) > ... 8 more > {quote} > I'm not certain that the issue is specifically due to shutting down (the KafkaConfigStorage.stop() hasn't been invoked yet when this occurs, so the underlying KafkaBasedLog is still running, although shutdown of the entire process has started), but this has only shown up during shutdown so far. -- This message was sent by Atlassian JIRA (v6.3.4#6332)