incubator-kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Neha Narkhede (Updated) (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
Date Sun, 08 Apr 2012 02:02:16 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Neha Narkhede updated KAFKA-320:
--------------------------------

    Resolution: Fixed
        Status: Resolved  (was: Patch Available)

Checked into trunk and 0.8 branch
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320-v3-delta.patch, kafka-320-v3.patch,
kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following
exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0.
This probably indicates that you either have configured a brokerid that is already in use,
or else you have shutdown this broker and restarted it faster than the zookeeper timeout so
it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of
its registration of the broker id from ZK. But, here is the Kafka bug causing this problem
-
> In the test during server1.shutdown(), we do close the zkClient associated with the broker
and it successfully deletes the broker's registration info from Zookeeper. After this, server1
can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown().
During this, the server doesn't really shutdown, since it is protected with the isShuttingDown
variable, which was never set to false in the startup() API. Now, this leads to an open zkclient
connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper
server. Then, since the kafka server during the previous run is still running, it can succesfully
renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does
this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer.
Also, during debugging this, I found that we don't close zkclient in the Producer as well.
Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182
(org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected
error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message