kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2121; Close internnal modules upon client shutdown; reviewed by Ewen Cheslack-Postava and Guozhang Wang
Date Wed, 22 Apr 2015 17:15:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6b964461a -> 01e94e2b4


KAFKA-2121; Close internnal modules upon client shutdown; reviewed by Ewen Cheslack-Postava
and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01e94e2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01e94e2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01e94e2b

Branch: refs/heads/trunk
Commit: 01e94e2b4a22eb0dd9ec4c3bbc353d054b1904b1
Parents: 6b96446
Author: Steven Wu <stevenz3wu@gmail.com>
Authored: Wed Apr 22 10:12:45 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Apr 22 10:14:44 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientUtils.java   |  16 ++
 .../org/apache/kafka/clients/KafkaClient.java   |   8 +-
 .../kafka/clients/consumer/KafkaConsumer.java   | 178 ++++++++++-------
 .../kafka/clients/producer/KafkaProducer.java   | 192 +++++++++++--------
 .../clients/producer/internals/Sender.java      |   7 +-
 .../apache/kafka/common/metrics/Metrics.java    |   4 +-
 .../common/serialization/Deserializer.java      |   7 +-
 .../kafka/common/serialization/Serializer.java  |   7 +-
 .../clients/consumer/KafkaConsumerTest.java     |  48 +++++
 .../clients/producer/KafkaProducerTest.java     |  49 +++++
 .../apache/kafka/test/MockMetricsReporter.java  |  52 +++++
 11 files changed, 397 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index d0da5d7..0d68bf1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -12,16 +12,21 @@
  */
 package org.apache.kafka.clients;
 
+import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 
 public class ClientUtils {
+    private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
 
     public static List<InetSocketAddress> parseAndValidateAddresses(List<String>
urls) {
         List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
@@ -45,4 +50,15 @@ public class ClientUtils {
             throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
         return addresses;
     }
+
+    public static void closeQuietly(Closeable c, String name, AtomicReference<Throwable>
firstException) {
+        if (c != null) {
+            try {
+                c.close();
+            } catch (Throwable t) {
+                firstException.compareAndSet(null, t);
+                log.error("Failed to close " + name, t);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 96ac6d0..1311f85 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.clients;
 
+import java.io.Closeable;
 import java.util.List;
 
 import org.apache.kafka.common.Node;
@@ -21,7 +22,7 @@ import org.apache.kafka.common.requests.RequestHeader;
 /**
  * The interface for {@link NetworkClient}
  */
-public interface KafkaClient {
+public interface KafkaClient extends Closeable {
 
     /**
      * Check if we are currently ready to send another request to the given node but don't
attempt to connect if we
@@ -130,9 +131,4 @@ public interface KafkaClient {
      */
     public void wakeup();
 
-    /**
-     * Close the client and disconnect from all nodes
-     */
-    public void close();
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2124334..09ecb42 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
@@ -32,6 +33,7 @@ import org.apache.kafka.clients.consumer.internals.Coordinator;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -346,6 +348,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
 
     private final Coordinator coordinator;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
     private final Fetcher<K, V> fetcher;
 
     private final Time time;
@@ -437,74 +441,97 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                           ConsumerRebalanceCallback callback,
                           Deserializer<K> keyDeserializer,
                           Deserializer<V> valueDeserializer) {
-        log.debug("Starting the Kafka consumer");
-        if (callback == null)
-            this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
-                                                                  ConsumerRebalanceCallback.class);
-        else
-            this.rebalanceCallback = callback;
-        this.time = new SystemTime();
-        this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
-        this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-        this.lastCommitAttemptMs = time.milliseconds();
-
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
-                                                      .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                                                                  TimeUnit.MILLISECONDS);
-        String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
-        String jmxPrefix = "kafka.consumer";
-        if (clientId.length() <= 0)
-          clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
-        List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                                                                        MetricsReporter.class);
-        reporters.add(new JmxReporter(jmxPrefix));
-        this.metrics = new Metrics(metricConfig, reporters, time);
-        this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-        this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
-        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        this.metadata.update(Cluster.bootstrap(addresses), 0);
-
-        String metricGrpPrefix = "consumer";
-        Map<String, String> metricsTags = new LinkedHashMap<String, String>();
-        metricsTags.put("client-id", clientId);
-        this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix, metricsTags),
-                                        this.metadata,
-                                        clientId,
-                                        100, // a fixed large enough value will suffice
-                                        config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
-                                        config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
-                                        config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
-        this.subscriptions = new SubscriptionState();
-        this.coordinator = new Coordinator(this.client,
-                                           config.getString(ConsumerConfig.GROUP_ID_CONFIG),
-                                           this.retryBackoffMs,
-                                           config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
-                                           config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-                                           this.metadata,
-                                           this.subscriptions,
-                                           metrics,
-                                           metricGrpPrefix,
-                                           metricsTags,
-                                           this.time);
-        this.fetcher = new Fetcher<K, V>(this.client,
-                                              this.retryBackoffMs,
-                                              config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
-                                              config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
-                                              config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
-                                              config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
-                                              config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(),
-                                              keyDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class) : keyDeserializer,
-                                              valueDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class) : valueDeserializer,
-                                              this.metadata,
-                                              this.subscriptions,
-                                              metrics,
-                                              metricGrpPrefix,
-                                              metricsTags,
-                                              this.time);
-
-        config.logUnused();
-
-        log.debug("Kafka consumer created");
+        try {
+            log.debug("Starting the Kafka consumer");
+            if (callback == null)
+                this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+                        ConsumerRebalanceCallback.class);
+            else
+                this.rebalanceCallback = callback;
+            this.time = new SystemTime();
+            this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+            this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+            this.lastCommitAttemptMs = time.milliseconds();
+
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                            TimeUnit.MILLISECONDS);
+            String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+            String jmxPrefix = "kafka.consumer";
+            if (clientId.length() <= 0)
+                clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
+            List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                    MetricsReporter.class);
+            reporters.add(new JmxReporter(jmxPrefix));
+            this.metrics = new Metrics(metricConfig, reporters, time);
+            this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), 0);
+
+            String metricGrpPrefix = "consumer";
+            Map<String, String> metricsTags = new LinkedHashMap<String, String>();
+            metricsTags.put("client-id", clientId);
+            this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix,
metricsTags),
+                    this.metadata,
+                    clientId,
+                    100, // a fixed large enough value will suffice
+                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                    config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
+                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
+            this.subscriptions = new SubscriptionState();
+            this.coordinator = new Coordinator(this.client,
+                    config.getString(ConsumerConfig.GROUP_ID_CONFIG),
+                    this.retryBackoffMs,
+                    config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+                    this.metadata,
+                    this.subscriptions,
+                    metrics,
+                    metricGrpPrefix,
+                    metricsTags,
+                    this.time);
+
+            if (keyDeserializer == null) {
+                this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                        Deserializer.class);
+                this.keyDeserializer.configure(config.originals(), false);
+            } else {
+                this.keyDeserializer = keyDeserializer;
+            }
+            if (valueDeserializer == null) {
+                this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                        Deserializer.class);
+                this.valueDeserializer.configure(config.originals(), false);
+            } else {
+                this.valueDeserializer = valueDeserializer;
+            }
+            this.fetcher = new Fetcher<K, V>(this.client,
+                    this.retryBackoffMs,
+                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
+                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
+                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
+                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
+                    config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(),
+                    this.keyDeserializer,
+                    this.valueDeserializer,
+                    this.metadata,
+                    this.subscriptions,
+                    metrics,
+                    metricGrpPrefix,
+                    metricsTags,
+                    this.time);
+
+            config.logUnused();
+
+            log.debug("Kafka consumer created");
+        } catch (Throwable t) {
+            // call close methods if internal objects are already constructed
+            // this is to prevent resource leak. see KAFKA-2121
+            close(true);
+            // now propagate the exception
+            throw new KafkaException("Failed to construct kafka consumer", t);
+        }
     }
 
     /**
@@ -806,13 +833,24 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
     @Override
     public synchronized void close() {
+        close(false);
+    }
+
+    private void close(boolean swallowException) {
         log.trace("Closing the Kafka consumer.");
+        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
         this.closed = true;
-        this.metrics.close();
-        this.client.close();
+        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
+        ClientUtils.closeQuietly(client, "consumer network client", firstException);
+        ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
+        ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
         log.debug("The Kafka consumer has closed.");
+        if (firstException.get() != null && !swallowException) {
+            throw new KafkaException("Failed to close kafka consumer", firstException.get());
+        }
     }
 
+
     private boolean shouldAutoCommit(long now) {
         return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b70e1a3..42b1292 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -18,6 +18,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
@@ -191,81 +192,89 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
 
     @SuppressWarnings("unchecked")
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V>
valueSerializer) {
-        log.trace("Starting the Kafka producer");
-        this.producerConfig = config;
-        this.time = new SystemTime();
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
-                                                      .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                                                                  TimeUnit.MILLISECONDS);
-        String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
-        if (clientId.length() <= 0)
-          clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
-        String jmxPrefix = "kafka.producer";
-        List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                                                                        MetricsReporter.class);
-        reporters.add(new JmxReporter(jmxPrefix));
-        this.metrics = new Metrics(metricConfig, reporters, time);
-        this.partitioner = new Partitioner();
-        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
-        this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-        this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
-        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
-        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
-        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
-        Map<String, String> metricTags = new LinkedHashMap<String, String>();
-        metricTags.put("client-id", clientId);
-        this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
-                                                 this.totalMemorySize,
-                                                 this.compressionType,
-                                                 config.getLong(ProducerConfig.LINGER_MS_CONFIG),
-                                                 retryBackoffMs,
-                                                 config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
-                                                 metrics,
-                                                 time,
-                                                 metricTags);
-        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
-
-        NetworkClient client = new NetworkClient(new Selector(this.metrics, time , "producer",
metricTags),
-                                                 this.metadata,
-                                                 clientId,
-                                                 config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
-                                                 config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
-                                                 config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
-                                                 config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG));
-        this.sender = new Sender(client,
-                                 this.metadata,
-                                 this.accumulator,
-                                 config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
-                                 (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
-                                 config.getInt(ProducerConfig.RETRIES_CONFIG),
-                                 config.getInt(ProducerConfig.TIMEOUT_CONFIG),
-                                 this.metrics,
-                                 new SystemTime(),
-                                 clientId);
-        String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0
? " | " + clientId : "");
-        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
-        this.ioThread.start();
-
-        this.errors = this.metrics.sensor("errors");
-
-        if (keySerializer == null) {
-            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                                                              Serializer.class);
-            this.keySerializer.configure(config.originals(), true);
-        } else {
-            this.keySerializer = keySerializer;
-        }
-        if (valueSerializer == null) {
-            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                                                                Serializer.class);
-            this.valueSerializer.configure(config.originals(), false);
-        } else {
-            this.valueSerializer = valueSerializer;
-        }
+        try {
+            log.trace("Starting the Kafka producer");
+            this.producerConfig = config;
+            this.time = new SystemTime();
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                            TimeUnit.MILLISECONDS);
+            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
+            if (clientId.length() <= 0)
+                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
+            String jmxPrefix = "kafka.producer";
+            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                    MetricsReporter.class);
+            reporters.add(new JmxReporter(jmxPrefix));
+            this.metrics = new Metrics(metricConfig, reporters, time);
+            this.partitioner = new Partitioner();
+            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
+            this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
+            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
+            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
+            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
+            Map<String, String> metricTags = new LinkedHashMap<String, String>();
+            metricTags.put("client-id", clientId);
+            this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
+                    this.totalMemorySize,
+                    this.compressionType,
+                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
+                    retryBackoffMs,
+                    config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
+                    metrics,
+                    time,
+                    metricTags);
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
+
+            NetworkClient client = new NetworkClient(new Selector(this.metrics, time, "producer",
metricTags),
+                    this.metadata,
+                    clientId,
+                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
+                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG));
+            this.sender = new Sender(client,
+                    this.metadata,
+                    this.accumulator,
+                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
+                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
+                    config.getInt(ProducerConfig.RETRIES_CONFIG),
+                    config.getInt(ProducerConfig.TIMEOUT_CONFIG),
+                    this.metrics,
+                    new SystemTime(),
+                    clientId);
+            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() >
0 ? " | " + clientId : "");
+            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
+            this.ioThread.start();
+
+            this.errors = this.metrics.sensor("errors");
+
+            if (keySerializer == null) {
+                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                        Serializer.class);
+                this.keySerializer.configure(config.originals(), true);
+            } else {
+                this.keySerializer = keySerializer;
+            }
+            if (valueSerializer == null) {
+                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                        Serializer.class);
+                this.valueSerializer.configure(config.originals(), false);
+            } else {
+                this.valueSerializer = valueSerializer;
+            }
 
-        config.logUnused();
-        log.debug("Kafka producer started");
+            config.logUnused();
+            log.debug("Kafka producer started");
+        } catch (Throwable t) {
+            // call close methods if internal objects are already constructed
+            // this is to prevent resource leak. see KAFKA-2121
+            close(true);
+            // now propagate the exception
+            throw new KafkaException("Failed to construct kafka producer", t);
+        }
     }
 
     private static int parseAcks(String acksString) {
@@ -513,17 +522,36 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      */
     @Override
     public void close() {
+        close(false);
+    }
+
+    private void close(boolean swallowException) {
         log.trace("Closing the Kafka producer.");
-        this.sender.initiateClose();
-        try {
-            this.ioThread.join();
-        } catch (InterruptedException e) {
-            throw new InterruptException(e);
+        // this will keep track of the first encountered exception
+        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
+        if (this.sender != null) {
+            try {
+                this.sender.initiateClose();
+            } catch (Throwable t) {
+                firstException.compareAndSet(null, t);
+                log.error("Failed to close sender", t);
+            }
         }
-        this.metrics.close();
-        this.keySerializer.close();
-        this.valueSerializer.close();
+        if (this.ioThread != null) {
+            try {
+                this.ioThread.join();
+            } catch (InterruptedException t) {
+                firstException.compareAndSet(null, t);
+                log.error("Interrupted while joining ioThread", t);
+            }
+        }
+        ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
+        ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
+        ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
         log.debug("The Kafka producer has closed.");
+        if (firstException.get() != null && !swallowException) {
+            throw new KafkaException("Failed to close kafka producer", firstException.get());
+        }
     }
 
     private static class FutureFailure implements Future<RecordMetadata> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 70954ca..b2db91c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -139,8 +139,11 @@ public class Sender implements Runnable {
                 log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
         }
-
-        this.client.close();
+        try {
+            this.client.close();
+        } catch (Exception e) {
+            log.error("Failed to close network client", e);
+        }
 
         log.debug("Shutdown of Kafka producer I/O thread has completed.");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index b3d3d7c..5f6caf9 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.common.metrics;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +47,7 @@ import org.apache.kafka.common.utils.Utils;
  * sensor.record(messageSize);
  * </pre>
  */
-public class Metrics {
+public class Metrics implements Closeable {
 
     private final MetricConfig config;
     private final ConcurrentMap<MetricName, KafkaMetric> metrics;
@@ -192,6 +193,7 @@ public class Metrics {
     /**
      * Close this metrics repository.
      */
+    @Override
     public void close() {
         for (MetricsReporter reporter : this.reporters)
             reporter.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index 13be6a3..9a57579 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -13,6 +13,7 @@
 
 package org.apache.kafka.common.serialization;
 
+import java.io.Closeable;
 import java.util.Map;
 
 /**
@@ -21,7 +22,7 @@ import java.util.Map;
  *
  * A class that implements this interface is expected to have a constructor with no parameter.
  */
-public interface Deserializer<T> {
+public interface Deserializer<T> extends Closeable {
 
     /**
      * Configure this class.
@@ -38,8 +39,4 @@ public interface Deserializer<T> {
      */
     public T deserialize(String topic, byte[] data);
 
-    /**
-     * Close this deserializer
-     */
-    public void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index c2fdc23..c440540 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -13,6 +13,7 @@
 
 package org.apache.kafka.common.serialization;
 
+import java.io.Closeable;
 import java.util.Map;
 
 /**
@@ -21,7 +22,7 @@ import java.util.Map;
  *
  * A class that implements this interface is expected to have a constructor with no parameter.
  */
-public interface Serializer<T> {
+public interface Serializer<T> extends Closeable {
 
     /**
      * Configure this class.
@@ -37,8 +38,4 @@ public interface Serializer<T> {
      */
     public byte[] serialize(String topic, T data);
 
-    /**
-     * Close this serializer
-     */
-    public void close();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
new file mode 100644
index 0000000..eea2c28
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.test.MockMetricsReporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class KafkaConsumerTest {
+
+    @Test
+    public void testConstructorClose() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+        MockMetricsReporter.CLOSE_COUNT.set(0);
+        try {
+            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+                    props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        } catch (KafkaException e) {
+            Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get());
+            MockMetricsReporter.CLOSE_COUNT.set(0);
+            Assert.assertEquals("Failed to construct kafka consumer", e.getMessage());
+            return;
+        }
+        Assert.fail("should have caught an exception and returned");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
new file mode 100644
index 0000000..49f1427
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.test.MockMetricsReporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class KafkaProducerTest {
+
+
+    @Test
+    public void testConstructorClose() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999");
+        props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+        MockMetricsReporter.CLOSE_COUNT.set(0);
+        try {
+            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
+                    props, new ByteArraySerializer(), new ByteArraySerializer());
+        } catch (KafkaException e) {
+            Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get());
+            MockMetricsReporter.CLOSE_COUNT.set(0);
+            Assert.assertEquals("Failed to construct kafka producer", e.getMessage());
+            return;
+        }
+        Assert.fail("should have caught an exception and returned");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/01e94e2b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
new file mode 100644
index 0000000..6f948f2
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockMetricsReporter implements MetricsReporter {
+    public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+
+    public MockMetricsReporter() {
+
+    }
+
+    @Override
+    public void init(List<KafkaMetric> metrics) {
+
+    }
+
+    @Override
+    public void metricChange(KafkaMetric metric) {
+
+    }
+
+    @Override
+    public void close() {
+        CLOSE_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+}


Mime
View raw message