kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jun Rao" <jun...@gmail.com>
Subject Re: Review Request 33620: Patch for KAFKA-1690
Date Mon, 03 Aug 2015 16:50:36 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review93862
-----------------------------------------------------------


Thanks for the patch. A few more comments below.


build.gradle (lines 247 - 249)
<https://reviews.apache.org/r/33620/#comment148282>

    As Ismael mentioned, we got rid of scala 2.9. So this is not needed.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 151 -
152)
<https://reviews.apache.org/r/33620/#comment148279>

    We probably need to try/catch IOException as we do above?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 234 -
239)
<https://reviews.apache.org/r/33620/#comment148265>

    If handshakeStatus is NEED_UNWRAP and write is true, we will fall through to the next
case. However, there may still be unflushed data. flush() won't be called when write is true.
Perhaps the check for write is unnecessary since (a) flush() always handles the case when
write is false; (b) since we may have done a flush in line 220 and the writable status could
have changed after that, which makes the value in write stale.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 262 -
269)
<https://reviews.apache.org/r/33620/#comment148268>

    Could we transition from NEED_WRAP to NOT_HANDSHAKING directly? Or NOT_HANDSHAKING can
only be transitioned from FINIHED state?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 306 -
320)
<https://reviews.apache.org/r/33620/#comment148281>

    It seems that the logic here can be simpler. In handshake(), we call flush at the beginning.
So, it seems that when handshakeFinished(), it should always be the case that there are no
remaining bytes in netWriteBuffer. So, in handshakeFinished(), it seems that we can just simply
set handshakeComplete to true and turn off OP_WRITE. Also, not sure if we need to check handshakeResult.getHandshakeStatus().



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 357 -
360)
<https://reviews.apache.org/r/33620/#comment148267>

    Is this correct? After netReadBuffer.compact in line 375, limit is set to capacity and
position is set to first unread byte. The only case when they can be equal is that we get
a full capacity worth of bytes and don't read any byte during unwrap. In this case, we shouldn't
empty the buffer.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 368 -
381)
<https://reviews.apache.org/r/33620/#comment148266>

    If handshake status is BUFFER_OVERFLOW, we will return to the caller and then to the selector.
However, we may have read all incoming bytes into netReadBuffer. So, the key may never be
selected again to complete the handshake. It seems that this case can never happen during
handshake since we don't expect to use the appReadBuffer. Perhaps we can just assert that
state is illegal when handling NEED_UNWRAP in handshake().



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (line 409)
<https://reviews.apache.org/r/33620/#comment148274>

    Agreed with Dong: Maybe change to "if (netread <= 0) return netread"?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (line 417)
<https://reviews.apache.org/r/33620/#comment148276>

    It's still not very clear to me how renegotiation can be supported in the middle of sends/receives.
Suppose that the server initiates a handshake. This may involve the server sending some handshake
bytes to the client. After this point, the server expects to read handshake bytes from the
client. However, the client may still be sending some regular bytes over the socket.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java (lines 430 -
433)
<https://reviews.apache.org/r/33620/#comment148275>

    Is this needed? If we need to expand appReadBuffer, netReadBuffer's position won't be
0 and we can just loop back.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 75)
<https://reviews.apache.org/r/33620/#comment148269>

    Could we add a comment on why we need to maintain this map?



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 247 - 248)
<https://reviews.apache.org/r/33620/#comment148272>

    It seems that we will need to further check whether those channels in stagedReceives are
muted or not.  Timeout should only be 0 if there is at least one unmuted channel in stagedReceives.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 281 - 287)
<https://reviews.apache.org/r/33620/#comment148273>

    To avoid having to buffer unbounded number of receives in staged receives, perhaps we
can choose not to read from the channel if there exist staged receives for a channel. This
will help protect the server from running out of memory due to a bursty producer client.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 320 - 323)
<https://reviews.apache.org/r/33620/#comment148270>

    There seems to be a problem with this. When there are ready keys, but those ready keys
don't include keys with stagedReceives, we still want to return one receive for those keys
that are not ready, but with stagedReceives. So, it seems that the logic will be (1) process
all ready keys as before, but just add the received to stagedReceivesl; (2) for every key
in stagedReceives, add the first receive to completedReceives if the key is not muted.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 502)
<https://reviews.apache.org/r/33620/#comment148271>

    Not sure why we need to check hasSend. It's possible for a channel to have both sends
and receives at the same time since the NetworkClient supports pipelining.



core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala (line 202)
<https://reviews.apache.org/r/33620/#comment148355>

    I am not sure that we need this test. The callback logic is in the consumer and is independent
of the socket.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (lines 113 - 122)
<https://reviews.apache.org/r/33620/#comment148348>

    It doesn't seem that we need to test this here since this is not SSL specific.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (line 179)
<https://reviews.apache.org/r/33620/#comment148349>

    This test seems unnecessary since the partitioning logic is SSL independent.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (line 233)
<https://reviews.apache.org/r/33620/#comment148350>

    This test seems unnecessary. Auto topic creation logic is SSL independent.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (line 257)
<https://reviews.apache.org/r/33620/#comment148351>

    Not sure if this test is useful either since the exercised logic is in the producer, not
in the socket layer.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (line 301)
<https://reviews.apache.org/r/33620/#comment148354>

    Not sure if this test is useful either since the exercised logic is in the producer, not
in the socket layer.



core/src/test/scala/unit/kafka/network/SocketServerTest.scala (line 194)
<https://reviews.apache.org/r/33620/#comment148356>

    Is this test needed given the tests we have on EchoServer?
    
    Also, do we have a test where the broker listens to multiple ports? This can be added
in a followup patch though.


- Jun Rao


On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> -----------------------------------------------------------
> 
> (Updated July 25, 2015, 7:11 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
>     https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with
the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with
the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Merge commit
> 
> 
> KAFKA-1690. Added SSL Consumer Test.
> 
> 
> KAFKA-1690. SSL Support.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> KAFKA-1690. added staged receives to selector.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Diffs
> -----
> 
>   build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 
>   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a

>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b

>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b

>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 70377ae2fa46deb381139d28590ce6d4115e1adc

>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java bea3d737c51be77d5b5293cdd944d33b905422ba

>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7

>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b

>   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc

>   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b

>   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80

>   clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 618a0fa53848ae6befea7eba39c2f3285b734494

>   clients/src/main/java/org/apache/kafka/common/network/Selector.java aaf60c98c2c0f4513a8d65ee0db67953a529d598

>   clients/src/main/java/org/apache/kafka/common/network/Send.java 8f6daadf6b67c3414911cda77765512131e56fd3

>   clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java dab1a94dd29563688b6ecf4eeb0e180b06049d3f

>   clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java PRE-CREATION

>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java af9993cf9b3991f1e61e1201c94e19bc1bf76a68

>   clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java 13ce519f03d13db041e1f2dbcd6b59414d2775b8

>   clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java f3f8334f848be4cc043d5a573975609a3681fe7e

>   clients/src/test/java/org/apache/kafka/common/network/EchoServer.java PRE-CREATION

>   clients/src/test/java/org/apache/kafka/common/network/SSLFactoryTest.java PRE-CREATION

>   clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java PRE-CREATION

>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 158f9829ff64a969008f699e40c51e918287859e

>   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java e7951d835472e5defe49be435f2c93685ba544d5

>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 51eb9d142f566c94a87add68b8c4f78b56d6ec3e

>   clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java PRE-CREATION 
>   core/src/main/scala/kafka/api/FetchResponse.scala 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215

>   core/src/main/scala/kafka/network/SocketServer.scala dbe784b63817fd94e1593136926db17fac6fa3d7

>   core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf

>   core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3

>   core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala d8eee52fc750e23c06c1f06f03b96980d9865a32

>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9ce4bd5ee130ce3cb252b2883a3fd3c9acd742a5

>   core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 8b14bcfe7af601fe4b0fb0a7c0c544e87403062a

>   core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala e4bf2df48dd59a251b646b7f96d63ec4b924fc0b

>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 7dc2fad542ea553ee888543dd215eb41ea57d509

>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 04a02e08a54139ee1a298c5354731bae009efef3

>   core/src/test/scala/unit/kafka/utils/TestUtils.scala eb169d8b33c27d598cc24e5a2e5f78b789fa38d3

> 
> Diff: https://reviews.apache.org/r/33620/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message