incubator-kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Fung (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-200) Support configurable send / receive socket buffer size in server
Date Thu, 15 Dec 2011 22:45:33 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13170544#comment-13170544
] 

John Fung commented on KAFKA-200:
---------------------------------

===============
Background Informations
===============

1. Socket.setReceiveBufferSize() has restriction specified in Java API doc : "For client sockets,
setReceiveBufferSize() must be called before connecting the socket to its remote peer."
- http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29

2. Socket.setSendBufferSize() does not have corresponding restriction as setReceiveBufferSize()
- http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setSendBufferSize%28int%29

3. SocketChannel.socket() returns Socket object which is needed to provide both setReceiveBufferSize()
and setSendBufferSize() methods (However, SocketChannel is available only after the connection
is accepted)

4. ServerSocketChannel.socket() returns ServerSocket object which provides only setReceiveBufferSize()
but *no* setSendBufferSize() method (ServerSocketChannel is available *before* the connection
is accepted)

5. Hence, ServerSocketChannel object is available to provide setReceiveBufferSize before calling
ServerSocketChannel.accept() which would satisfy the restriction specified in item 1

The above 5 items are background informations which lead to the following 3 different approaches
to set send/receive buffer sizes in Kafka (tested in Linux).

It appears that Approach 3 is the most appropriate in this scenario.

============================
Approach 1 - KAFKA-200.patch (existing patch)

1. call setReceiveBufferSize after accept()
2. call setSendBufferSize after accept()
============================

def accept(key: SelectionKey, processor: Processor) {
    val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)
    socketChannel.socket().setReceiveBufferSize(receiveBufferSize)
     
    processor.accept(socketChannel)
}

=====
Results:
=====
1. In Linux machine, seeing both send / receive buffer socket sizes set to the expected values.
2. It doesn't comply with the restriction specified by Java API doc

============================
Approach 2:

1. call setReceiveBufferSize before accept()
2. call setSendBufferSize before accept()
============================

def accept(key: SelectionKey, processor: Processor) {
      val socketChannel = key.channel().asInstanceOf[SocketChannel]
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)
      socketChannel.socket().setReceiveBufferSize(receiveBufferSize)
      socketChannel.asInstanceOf[ServerSocketChannel].accept()
     
      processor.accept(socketChannel)
}

=====
Results:
=====
1. Compilation OK.
2. Runtime error for casting:

[2011-12-15 21:31:52,986] ERROR Error in acceptor (kafka.network.Acceptor)
java.lang.ClassCastException
[2011-12-15 21:31:52,986] ERROR Error in acceptor (kafka.network.Acceptor)
java.lang.ClassCastException
. . .


============================
Approach 3:

1. call setReceiveBufferSize before accept()
    (complying with restriction specified in http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29)

2. call setSendBufferSize after accept()
    (No corresponding restriction to call before accept() )
============================

def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    serverSocketChannel.configureBlocking(false)
    serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
   
    val socketChannel = serverSocketChannel.accept()
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    processor.accept(socketChannel)
}

=====
Results:
=====
1. Comply with the restriction specified in Java API doc
2. Both sendBufferSize and receiveBufferSize are set to the expected values:
    [2011-12-15 21:46:46,525] DEBUG sendBufferSize: [1048576] receiveBufferSize: [1048576]
(kafka.network.Acceptor)
                
> Support configurable send / receive socket buffer size in server
> ----------------------------------------------------------------
>
>                 Key: KAFKA-200
>                 URL: https://issues.apache.org/jira/browse/KAFKA-200
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 0.7
>            Reporter: John Fung
>             Fix For: 0.8
>
>         Attachments: KAFKA-200.patch
>
>
> * Make the send / receive socket buffer size configurable in server.
> * KafkaConfig.scala already has the following existing variables to support send / receive
buffer:
>     socketSendBuffer
>     socketReceiveBuffer
> * The patch attached to this ticket will read the following existing settings in <kafka>/config/server.properties
and set the corresponding socket buffers
>     . . .
>     # The send buffer (SO_SNDBUF) used by the socket server
>     socket.send.buffer=1048576
>     # The receive buffer (SO_RCVBUF) used by the socket server
>     socket.receive.buffer=1048576

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message