kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yang Ye (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-369) remove ZK dependency on producer
Date Sat, 04 Aug 2012 02:28:03 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Yang Ye updated KAFKA-369:
--------------------------

    Attachment: kafka_369_v2.diff


In AsyncProducerTest.testFailedSendRetryLogic(), we may see following error messges, it won't
affect the success of the test. It's because in handle() function of DefaultEventHandler,

        if (outstandingProduceRequests.size > 0)  {
          // back off and update the topic metadata cache before attempting another send operation
          Thread.sleep(config.producerRetryBackoffMs)
          // get topics of the outstanding produce requests and refresh metadata for those
         Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))

it will refresh the cached metadata, but the broker is not up.

This doesn't affect the correctness of the test. Maybe we need try to eliminate the error
messages.  





[2012-08-03 19:11:44,182] ERROR Connection attempt to 127.0.0.1:52955 failed, next attempt
in 100 ms (kafka.producer.SyncProducer:99)
java.net.ConnectException: Connection refused
	at sun.nio.ch.Net.connect(Native Method)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
	at kafka.producer.SyncProducer.connect(SyncProducer.scala:161)
	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:182)
	at kafka.producer.SyncProducer.doSend(SyncProducer.scala:74)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:116)
	at kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:86)
	at kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:81)
	at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
	at scala.collection.immutable.List.foreach(List.scala:45)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:44)
	at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:81)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:53)
	at kafka.utils.Utils$.swallow(Utils.scala:429)
	at kafka.utils.Logging$class.swallowError(Logging.scala:102)
	at kafka.utils.Utils$.swallowError(Utils.scala:40)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53)
	at kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:438)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at junit.framework.TestCase.runTest(TestCase.java:164)
	at junit.framework.TestCase.runBare(TestCase.java:130)
	at junit.framework.TestResult$1.protect(TestResult.java:110)
	at junit.framework.TestResult.runProtected(TestResult.java:128)
	at junit.framework.TestResult.run(TestResult.java:113)
	at junit.framework.TestCase.run(TestCase.java:120)
	at junit.framework.TestSuite.runTest(TestSuite.java:228)
	at junit.framework.TestSuite.run(TestSuite.java:223)
	at org.junit.internal.runners.OldTestClassRunner.run(OldTestClassRunner.java:35)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:121)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:71)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:199)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:62)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
                
> remove ZK dependency on producer
> --------------------------------
>
>                 Key: KAFKA-369
>                 URL: https://issues.apache.org/jira/browse/KAFKA-369
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>         Attachments: kafka_369_v1.diff, kafka_369_v2.diff
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Currently, the only place that ZK is actually used is in BrokerPartitionInfo. We use
ZK to get a list of brokers for making TopicMetadataRequest requests. Instead, we can provide
a list of brokers in the producer config directly. That way, the producer client is no longer
dependant on ZK.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message