kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jun Rao (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-202) Make the request processing in kafka asynchonous
Date Fri, 06 Jan 2012 06:03:39 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13181141#comment-13181141
] 

Jun Rao commented on KAFKA-202:
-------------------------------

Overall, the patch looks good. Some comments:

1. KafkaServer.startup doesn't have to capture exception and shutdown. The caller in KafkaServerStarable
already does that. Plus, it shuts down embedded consumer appropriately if needed.

2. There is KafkaRequestHandlers.scala.rej in the patch.

3. Unit test seems to fail occasionally, giving the following error.
[info] == core-kafka / kafka.integration.LazyInitProducerTest ==
[2012-01-05 21:57:38,773] ERROR Error processing MultiProducerRequest on test:0 (kafka.server.KafkaApis:82)
java.nio.channels.ClosedChannelException
	at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
	at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
	at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
	at kafka.log.Log.append(Log.scala:215)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
	at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
	at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
	at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:43)
	at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
	at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
	at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,773] ERROR Error processing ProduceRequest on test:0 (kafka.server.KafkaApis:82)
java.nio.channels.ClosedChannelException
	at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
	at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
	at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
	at kafka.log.Log.append(Log.scala:215)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
	at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:40)
	at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
	at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
	at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer
request: null (kafka.server.KafkaApis:92)
java.nio.channels.ClosedChannelException
	at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
	at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
	at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
	at kafka.log.Log.append(Log.scala:215)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
	at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:40)
	at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
	at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
	at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer
request: null (kafka.server.KafkaApis:92)
java.nio.channels.ClosedChannelException
	at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
	at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
	at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
	at kafka.log.Log.append(Log.scala:215)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
	at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
	at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
	at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:43)
	at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
	at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
	at java.lang.Thread.run(Thread.java:662)
[info] Test Starting: testProduceAndFetch(kafka.integration.LazyInitProducerTest)

                
> Make the request processing in kafka asynchonous
> ------------------------------------------------
>
>                 Key: KAFKA-202
>                 URL: https://issues.apache.org/jira/browse/KAFKA-202
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-202-v2.patch, KAFKA-202-v3.patch, KAFKA-48-socket-server-refactor-draft.patch
>
>
> We need to handle long-lived requests to support replication. To make this work we need
to make the processing mechanism asynchronous from the network threads.
> To accomplish this we will retain the existing pool of network threads but add a new
pool of request handling threads. These will do all the disk I/O. There will be a queue mechanism
to transfer requests to and from this secondary pool.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message