kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2121; Fix Closeable backward-compatibility; reviewed by Guozhang Wang
Date Fri, 08 May 2015 04:02:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 31dadf024 -> 5a47ef8ec


KAFKA-2121; Fix Closeable backward-compatibility; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a47ef8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a47ef8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a47ef8e

Branch: refs/heads/trunk
Commit: 5a47ef8ecbdf574bb18bd9ee5397188097924558
Parents: 31dadf0
Author: Steven Wu <stevenz3wu@gmail.com>
Authored: Thu May 7 17:04:51 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu May 7 17:04:51 2015 -0700

----------------------------------------------------------------------
 .../common/serialization/Deserializer.java      |  2 ++
 .../kafka/common/serialization/Serializer.java  |  2 ++
 .../clients/consumer/KafkaConsumerTest.java     |  7 +++--
 .../clients/producer/KafkaProducerTest.java     | 30 +++++++++++++++++---
 .../apache/kafka/test/MockMetricsReporter.java  |  3 +-
 5 files changed, 36 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5a47ef8e/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index 9a57579..254b556 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -39,4 +39,6 @@ public interface Deserializer<T> extends Closeable {
      */
     public T deserialize(String topic, byte[] data);
 
+    @Override
+    public void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a47ef8e/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index c440540..16a67a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -38,4 +38,6 @@ public interface Serializer<T> extends Closeable {
      */
     public byte[] serialize(String topic, T data);
 
+    @Override
+    public void close();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a47ef8e/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index eea2c28..738f3ed 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -33,13 +33,14 @@ public class KafkaConsumerTest {
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
-        MockMetricsReporter.CLOSE_COUNT.set(0);
+        final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
+        final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         try {
             KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
                     props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer());
         } catch (KafkaException e) {
-            Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get());
-            MockMetricsReporter.CLOSE_COUNT.set(0);
+            Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
+            Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
             Assert.assertEquals("Failed to construct kafka consumer", e.getMessage());
             return;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a47ef8e/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 49f1427..f3f8334 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockSerializer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,22 +29,43 @@ public class KafkaProducerTest {
 
 
     @Test
-    public void testConstructorClose() throws Exception {
+    public void testConstructorFailureCloseResource() {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999");
         props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
-        MockMetricsReporter.CLOSE_COUNT.set(0);
+        final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
+        final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         try {
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
                     props, new ByteArraySerializer(), new ByteArraySerializer());
         } catch (KafkaException e) {
-            Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get());
-            MockMetricsReporter.CLOSE_COUNT.set(0);
+            Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
+            Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
             Assert.assertEquals("Failed to construct kafka producer", e.getMessage());
             return;
         }
         Assert.fail("should have caught an exception and returned");
     }
+
+    @Test
+    public void testSerializerClose() {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+        final int oldInitCount = MockSerializer.INIT_COUNT.get();
+        final int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
+
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
+                props, new MockSerializer(), new MockSerializer());
+        Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
+        Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
+
+        producer.close();
+        Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
+        Assert.assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a47ef8e/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
index 6f948f2..2a8fa1f 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockMetricsReporter implements MetricsReporter {
+    public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
 
     public MockMetricsReporter() {
@@ -32,7 +33,7 @@ public class MockMetricsReporter implements MetricsReporter {
 
     @Override
     public void init(List<KafkaMetric> metrics) {
-
+        INIT_COUNT.incrementAndGet();
     }
 
     @Override


Mime
View raw message