kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "jin xing (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-2944) NullPointerException in KafkaConfigStorage when config storage starts right before shutdown request
Date Fri, 01 Jan 2016 15:16:40 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-2944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15076312#comment-15076312
] 

jin xing commented on KAFKA-2944:
---------------------------------

Cannot reproduce this; believe it is transient failure;
KafkaConfigStorage code as below:
private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new
Callback<ConsumerRecord<String, byte[]>>() {
   public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
{
       ...
       else if (record.key().startsWith(TASK_PREFIX)) {
           Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector());
           if (deferred == null) {
               deferred = new HashMap<>();
               deferredTaskUpdates.put(taskId.connector(), deferred);
           }
                    deferred.put(taskId, (Map<String, String>) newTaskConfig);
       } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
           Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
                  
           int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
           Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
           Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
           if (!completeTaskIdSet(taskIdSet, newTaskCount)) {     //NullPointerException comes
out from here
                        ....
            }
       }
Since method "halt()" in DistributedHerder has not executed yet, believe that it is not the
issue of shutdown; 
In method of KafkaConfigStorage::putTaskConfigs, if failed to send messages with TASK but
succeeded sending message with COMMIT_TASKS_PREFIX, the deferredTaskUpdates will not have
corresponding key of connector;
So it make sense to call a 'flush'  after sending easy message of connector or task configuration
to KafkaBasedLog;

> NullPointerException in KafkaConfigStorage when config storage starts right before shutdown
request
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2944
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2944
>             Project: Kafka
>          Issue Type: Bug
>          Components: copycat
>    Affects Versions: 0.9.0.0
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>
> Relevant log where you can see a config update starting, then the request to shutdown
happens and we end up with a NullPointerException:
> {quote}
> [2015-12-03 09:12:55,712] DEBUG Change in connector task count from 2 to 3, writing updated
task configurations (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2015-12-03 09:12:56,224] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect)
> [2015-12-03 09:12:56,224] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer)
> [2015-12-03 09:12:56,227] INFO Stopped ServerConnector@10cb550e{HTTP/1.1}{0.0.0.0:8083}
(org.eclipse.jetty.server.ServerConnector)
> [2015-12-03 09:12:56,234] INFO Stopped o.e.j.s.ServletContextHandler@3f8a24d5{/,null,UNAVAILABLE}
(org.eclipse.jetty.server.handler.ContextHandler)
> [2015-12-03 09:12:56,235] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer)
> [2015-12-03 09:12:56,235] INFO Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2015-12-03 09:12:58,209] ERROR Unexpected exception in KafkaBasedLog's work thread (org.apache.kafka.connect.util.KafkaBasedLog)
> java.lang.NullPointerException
> 	at org.apache.kafka.connect.storage.KafkaConfigStorage.completeTaskIdSet(KafkaConfigStorage.java:558)
> 	at org.apache.kafka.connect.storage.KafkaConfigStorage.access$1200(KafkaConfigStorage.java:143)
> 	at org.apache.kafka.connect.storage.KafkaConfigStorage$1.onCompletion(KafkaConfigStorage.java:476)
> 	at org.apache.kafka.connect.storage.KafkaConfigStorage$1.onCompletion(KafkaConfigStorage.java:372)
> 	at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:235)
> 	at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:275)
> 	at org.apache.kafka.connect.util.KafkaBasedLog.access$300(KafkaBasedLog.java:70)
> 	at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:307)
> [2015-12-03 09:13:26,704] ERROR Failed to write root configuration to Kafka:  (org.apache.kafka.connect.storage.KafkaConfigStorage)
> java.util.concurrent.TimeoutException: Timed out waiting for future
> 	at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:74)
> 	at org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:352)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:737)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:677)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:673)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
> 	at java.lang.Thread.run(Thread.java:745)
> [2015-12-03 09:13:26,704] ERROR Failed to reconfigure connector's tasks, retrying after
backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> org.apache.kafka.connect.errors.ConnectException: Error writing root configuration to
Kafka
> 	at org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:355)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:737)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:677)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:673)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184)
> 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.TimeoutException: Timed out waiting for future
> 	at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:74)
> 	at org.apache.kafka.connect.storage.KafkaConfigStorage.putTaskConfigs(KafkaConfigStorage.java:352)
> 	... 8 more
> {quote}
> I'm not certain that the issue is specifically due to shutting down (the KafkaConfigStorage.stop()
hasn't been invoked yet when this occurs, so the underlying KafkaBasedLog is still running,
although shutdown of the entire process has started), but this has only shown up during shutdown
so far.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message