kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Eliminate warnings from KafkaProducerTest (#5548)
Date Thu, 23 Aug 2018 13:40:46 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b1090e5  MINOR: Eliminate warnings from KafkaProducerTest (#5548)
b1090e5 is described below

commit b1090e52a3c7ea83ef2d79cc4b6ff7084a5070a4
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
AuthorDate: Thu Aug 23 15:40:33 2018 +0200

    MINOR: Eliminate warnings from KafkaProducerTest (#5548)
    
    And code clean-ups in the same file.
    
    Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/clients/producer/KafkaProducerTest.java  | 145 ++++++++++-----------
 1 file changed, 67 insertions(+), 78 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index dd2dd89..22fa0a1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidTopicException;
@@ -45,7 +44,6 @@ import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockPartitioner;
 import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -100,7 +98,7 @@ public class KafkaProducerTest {
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
-        try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props,
new ByteArraySerializer(), new ByteArraySerializer())) {
+        try (KafkaProducer<byte[], byte[]> ignored = new KafkaProducer<>(props,
new ByteArraySerializer(), new ByteArraySerializer())) {
             fail("should have caught an exception and returned");
         } catch (KafkaException e) {
             assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
@@ -110,7 +108,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testSerializerClose() throws Exception {
+    public void testSerializerClose() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
@@ -119,7 +117,7 @@ public class KafkaProducerTest {
         final int oldInitCount = MockSerializer.INIT_COUNT.get();
         final int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
 
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
                 configs, new MockSerializer(), new MockSerializer());
         assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
         assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
@@ -130,7 +128,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testInterceptorConstructClose() throws Exception {
+    public void testInterceptorConstructClose() {
         try {
             Properties props = new Properties();
             // test with client ID assigned by KafkaProducer
@@ -138,7 +136,7 @@ public class KafkaProducerTest {
             props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
             props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");
 
-            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
+            KafkaProducer<String, String> producer = new KafkaProducer<>(
                     props, new StringSerializer(), new StringSerializer());
             assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
             assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
@@ -156,14 +154,14 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testPartitionerClose() throws Exception {
+    public void testPartitionerClose() {
         try {
             Properties props = new Properties();
             props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
             MockPartitioner.resetCounters();
             props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
 
-            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
+            KafkaProducer<String, String> producer = new KafkaProducer<>(
                     props, new StringSerializer(), new StringSerializer());
             assertEquals(1, MockPartitioner.INIT_COUNT.get());
             assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
@@ -189,7 +187,7 @@ public class KafkaProducerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -201,16 +199,13 @@ public class KafkaProducerTest {
         ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicReference<Exception> closeException = new AtomicReference<>();
         try {
-            Future<?> future = executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    producer.send(new ProducerRecord<>("topic", "key", "value"));
-                    try {
-                        producer.close();
-                        fail("Close should block and throw.");
-                    } catch (Exception e) {
-                        closeException.set(e);
-                    }
+            Future<?> future = executor.submit(() -> {
+                producer.send(new ProducerRecord<>("topic", "key", "value"));
+                try {
+                    producer.close();
+                    fail("Close should block and throw.");
+                } catch (Exception e) {
+                    closeException.set(e);
                 }
             });
 
@@ -225,14 +220,11 @@ public class KafkaProducerTest {
 
             assertTrue("Close terminated prematurely", future.cancel(true));
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return closeException.get() != null;
-                }
-            }, "InterruptException did not occur within timeout.");
+            TestUtils.waitForCondition(() -> closeException.get() != null,
+                    "InterruptException did not occur within timeout.");
 
-            assertTrue("Expected exception not thrown " + closeException, closeException.get()
instanceof InterruptException);
+            assertTrue("Expected exception not thrown " + closeException,
+                    closeException.get() instanceof InterruptException);
         } finally {
             executor.shutdownNow();
         }
@@ -240,7 +232,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testOsDefaultSocketBufferSizes() throws Exception {
+    public void testOsDefaultSocketBufferSizes() {
         Map<String, Object> config = new HashMap<>();
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
@@ -249,7 +241,7 @@ public class KafkaProducerTest {
     }
 
     @Test(expected = KafkaException.class)
-    public void testInvalidSocketSendBufferSize() throws Exception {
+    public void testInvalidSocketSendBufferSize() {
         Map<String, Object> config = new HashMap<>();
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ProducerConfig.SEND_BUFFER_CONFIG, -2);
@@ -257,7 +249,7 @@ public class KafkaProducerTest {
     }
 
     @Test(expected = KafkaException.class)
-    public void testInvalidSocketReceiveBufferSize() throws Exception {
+    public void testInvalidSocketReceiveBufferSize() {
         Map<String, Object> config = new HashMap<>();
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2);
@@ -277,15 +269,15 @@ public class KafkaProducerTest {
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
         Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
         final Cluster emptyCluster = new Cluster(null, nodes,
-                Collections.<PartitionInfo>emptySet(),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet());
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet());
         final Cluster cluster = new Cluster(
                 "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet());
+                Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
+                Collections.emptySet(),
+                Collections.emptySet());
 
         // Expect exactly one fetch for each attempt to refresh while topic metadata is not
available
         final int refreshAttempts = 5;
@@ -328,15 +320,15 @@ public class KafkaProducerTest {
         ProducerRecord<String, String> extendedRecord = new ProducerRecord<>(topic,
2, null, "value");
         Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
         final Cluster emptyCluster = new Cluster(null, nodes,
-                Collections.<PartitionInfo>emptySet(),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet());
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet());
         final Cluster initialCluster = new Cluster(
                 "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet());
+                Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
+                Collections.emptySet(),
+                Collections.emptySet());
         final Cluster extendedCluster = new Cluster(
                 "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
@@ -344,8 +336,8 @@ public class KafkaProducerTest {
                         new PartitionInfo(topic, 0, null, null, null),
                         new PartitionInfo(topic, 1, null, null, null),
                         new PartitionInfo(topic, 2, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet());
+                Collections.emptySet(),
+                Collections.emptySet());
 
         // Expect exactly one fetch for each attempt to refresh while topic metadata is not
available
         final int refreshAttempts = 5;
@@ -405,18 +397,15 @@ public class KafkaProducerTest {
         MemberModifier.field(KafkaProducer.class, "time").set(producer, time);
         final String topic = "topic";
 
-        Thread t = new Thread() {
-            @Override
-            public void run() {
-                long startTimeMs = System.currentTimeMillis();
-                for (int i = 0; i < 10; i++) {
-                    while (!metadata.updateRequested() && System.currentTimeMillis()
- startTimeMs < 1000)
-                        yield();
-                    metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
-                    time.sleep(60 * 1000L);
-                }
+        Thread t = new Thread(() -> {
+            long startTimeMs = System.currentTimeMillis();
+            for (int i = 0; i < 10; i++) {
+                while (!metadata.updateRequested() && System.currentTimeMillis()
- startTimeMs < 1000)
+                    Thread.yield();
+                metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
+                time.sleep(60 * 1000L);
             }
-        };
+        });
         t.start();
         try {
             producer.partitionsFor(topic);
@@ -432,8 +421,10 @@ public class KafkaProducerTest {
     public void testHeaders() throws Exception {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        ExtendedSerializer keySerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
-        ExtendedSerializer valueSerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
+        @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
+        ExtendedSerializer<String> keySerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
+        @SuppressWarnings("unchecked")
+        ExtendedSerializer<String> valueSerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
 
         KafkaProducer<String, String> producer = new KafkaProducer<>(props, keySerializer,
valueSerializer);
         Metadata metadata = PowerMock.createNiceMock(Metadata.class);
@@ -443,9 +434,9 @@ public class KafkaProducerTest {
         final Cluster cluster = new Cluster(
                 "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet());
+                Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
+                Collections.emptySet(),
+                Collections.emptySet());
 
 
         EasyMock.expect(metadata.fetch()).andReturn(cluster).anyTimes();
@@ -522,15 +513,16 @@ public class KafkaProducerTest {
         final Cluster cluster = new Cluster(
             "dummy",
             Collections.singletonList(new Node(0, "host1", 1000)),
-            Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
-            Collections.<String>emptySet(),
-            Collections.<String>emptySet());
+            Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
+            Collections.emptySet(),
+            Collections.emptySet());
         EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
 
         // Mock interceptors field
-        ProducerInterceptors interceptors = PowerMock.createMock(ProducerInterceptors.class);
+        @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
+        ProducerInterceptors<String, String> interceptors = PowerMock.createMock(ProducerInterceptors.class);
         EasyMock.expect(interceptors.onSend(record)).andReturn(record);
-        interceptors.onSendError(EasyMock.eq(record), EasyMock.<TopicPartition>notNull(),
EasyMock.<Exception>notNull());
+        interceptors.onSendError(EasyMock.eq(record), EasyMock.notNull(), EasyMock.notNull());
         EasyMock.expectLastCall();
         MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors);
 
@@ -565,19 +557,16 @@ public class KafkaProducerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
 
-        Producer<String, String> producer = new KafkaProducer<>(
+        try (Producer<String, String> producer = new KafkaProducer<>(
                 new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, client);
-        try {
+                new StringSerializer(), new StringSerializer(), metadata, client)) {
             producer.initTransactions();
             fail("initTransactions() should have raised TimeoutException");
-        } finally {
-            producer.close(0, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -593,7 +582,7 @@ public class KafkaProducerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -626,7 +615,7 @@ public class KafkaProducerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -638,16 +627,16 @@ public class KafkaProducerTest {
         String invalidTopicName = "topic abc";          // Invalid topic name due to space
         ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName,
"HelloKafka");
 
-        Set<String> invalidTopic = new HashSet<String>();
+        Set<String> invalidTopic = new HashSet<>();
         invalidTopic.add(invalidTopicName);
         Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
                 cluster.nodes(),
-                new ArrayList<PartitionInfo>(0),
-                Collections.<String>emptySet(),
+                new ArrayList<>(0),
+                Collections.emptySet(),
                 invalidTopic,
                 cluster.internalTopics(),
                 cluster.controller());
-        client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.emptySet());
 
         Future<RecordMetadata> future = producer.send(record);
 
@@ -669,7 +658,7 @@ public class KafkaProducerTest {
         Cluster cluster = TestUtils.singletonCluster();
         Node node = cluster.nodes().get(0);
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
 


Mime
View raw message