kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rajini Sivaram (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-5345) Some socket connections not closed after restart of Kafka Streams
Date Fri, 02 Jun 2017 14:54:04 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-5345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Rajini Sivaram updated KAFKA-5345:
----------------------------------
    Fix Version/s: 0.10.2.2

> Some socket connections not closed after restart of Kafka Streams
> -----------------------------------------------------------------
>
>                 Key: KAFKA-5345
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5345
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1
>         Environment: MacOs 10.12.5 and Ubuntu 14.04
>            Reporter: Jeroen van Wilgenburg
>            Assignee: Rajini Sivaram
>             Fix For: 0.11.0.0, 0.10.2.2
>
>
> We ran into a problem that resulted in a "Too many open files" exception because some
sockets are not closed after a restart.
> This problem only occurs with version {{0.10.2.1}} and {{0.10.2.0}}. 
> {{0.10.1.1}} and {{0.10.1.0}} both work as expected.
> I used the same version for the server and client.
> I used https://github.com/kohsuke/file-leak-detector to display the open file descriptors.
The culprit was :
> {noformat}
> #146 socket channel by thread:pool-2-thread-1 on Mon May 29 11:20:47 CEST 2017
> 	at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:108)
> 	at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
> 	at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
> 	at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
> 	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
> 	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186)
> 	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.ensureOneNodeIsReady(StreamsKafkaClient.java:195)
> 	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.getAnyReadyBrokerId(StreamsKafkaClient.java:233)
> 	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.checkBrokerCompatibility(StreamsKafkaClient.java:300)
> 	at org.apache.kafka.streams.KafkaStreams.checkBrokerVersionCompatibility(KafkaStreams.java:401)
> 	at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:425)
> {noformat}	
> 	
> 	
> I could narrow the problem down to a reproducable example below (the only dependency
is 
> {{org.apache.kafka:kafka-streams:jar:0.10.2.1}}). 
> *IMPORTANT*: You have to run this code in the Intellij IDEA debugger with a special breakpoint
to see it fail. 
> See the comments on the socketChannels variable on how to add this breakpoint. 
> When you run this code you will see the number of open SocketChannels increase (only
on version 0.10.2.x).
> 	
> {code:title=App.java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import java.nio.channels.SocketChannel;
> import java.nio.channels.spi.AbstractInterruptibleChannel;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Properties;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ScheduledExecutorService;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> public class App {
>     private static KafkaStreams streams;
>     private static String brokerList;
>     // Fill socketChannels with entries on line 'Socket socket = socketChannel.socket();'
(line number 170  on 0.10.2.1)
>     // of org.apache.kafka.common.network.Selector: Add breakpoint, right click on breakpoint.
>     // - Uncheck 'Suspend'
>     // - Check 'Evaluate and log' and fill text field with (without quotes) 'App.socketChannels.add(socketChannel)'
>     private static final List<SocketChannel> socketChannels = new ArrayList<>();
>     public static void main(String[] args) {
>         brokerList = args[0];
>         init();
>         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
>         Runnable command = () -> {
>             streams.close();
>             System.out.println("Open socketChannels: " + socketChannels.stream()
>                     .filter(AbstractInterruptibleChannel::isOpen)
>                     .collect(Collectors.toList()).size());
>             init();
>         };
>         scheduledThreadPool.scheduleWithFixedDelay(command, 10000L, 2000, TimeUnit.MILLISECONDS);
>     }
>     private static void init() {
>         Properties streamsConfiguration = new Properties();
>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JeroenApp");
>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
>         StreamsConfig config = new StreamsConfig(streamsConfiguration);
>         KStreamBuilder builder = new KStreamBuilder();
>         KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(),
"HarrieTopic");
>         stream.foreach((key, value) -> System.out.println(value));
>         streams = new KafkaStreams(builder, config);
>         streams.start();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message