kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Avoid calling interceptors.onSendError() with null TopicPartition
Date Tue, 13 Jun 2017 12:14:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 ad74035d5 -> d98993bc1


MINOR: Avoid calling interceptors.onSendError() with null TopicPartition

Assign non-null tp as soon as possible once we know the partition. This is
so that if ensureValidRecordSize() throws, the
interceptors.onSendError() call is made with a non-null tp.

Author: Tom Bentley <tbentley@redhat.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3280 from tombentley/tp-assign

(cherry picked from commit 4660bac4eb45a65767d05b03d275e3bf08a85c2a)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d98993bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d98993bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d98993bc

Branch: refs/heads/0.11.0
Commit: d98993bc1583f37492d1d7fda094f792c4d8c2c2
Parents: ad74035
Author: Tom Bentley <tbentley@redhat.com>
Authored: Tue Jun 13 11:39:17 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Jun 13 13:14:28 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  2 +-
 .../clients/producer/KafkaProducerTest.java     | 38 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d98993bc/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 4f155a4..9e84a31 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -630,6 +630,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                         " specified in value.serializer");
             }
             int partition = partition(record, serializedKey, serializedValue, cluster);
+            tp = new TopicPartition(record.topic(), partition);
 
             setReadOnly(record.headers());
             Header[] headers = record.headers().toArray();
@@ -637,7 +638,6 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             int serializedSize = AbstractRecords.sizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                     serializedKey, serializedValue, headers);
             ensureValidRecordSize(serializedSize);
-            tp = new TopicPartition(record.topic(), partition);
             long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
             log.trace("Sending record {} with callback {} to topic {} partition {}", record,
callback, record.topic(), partition);
             // producer callback will make sure to call both 'callback' and interceptor callback

http://git-wip-us.apache.org/repos/asf/kafka/blob/d98993bc/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index e2fe614..56c1b18 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -18,10 +18,12 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -434,5 +436,41 @@ public class KafkaProducerTest {
             assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel());
         }
     }
+    
+    @PrepareOnlyThisForTest(Metadata.class)
+    @Test
+    public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
+        String topic = "topic";
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
+        
+        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new
StringSerializer(),
+                new StringSerializer());
+        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
+        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
+        final Cluster cluster = new Cluster(
+            "dummy",
+            Collections.singletonList(new Node(0, "host1", 1000)),
+            Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
+            Collections.<String>emptySet(),
+            Collections.<String>emptySet());
+        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
+        
+        // Mock interceptors field
+        ProducerInterceptors interceptors = PowerMock.createMock(ProducerInterceptors.class);
+        EasyMock.expect(interceptors.onSend(record)).andReturn(record);
+        interceptors.onSendError(EasyMock.eq(record), EasyMock.<TopicPartition>notNull(),
EasyMock.<Exception>notNull());
+        EasyMock.expectLastCall();
+        MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors);
+        
+        PowerMock.replay(metadata);
+        EasyMock.replay(interceptors);
+        producer.send(record);
+        
+        EasyMock.verify(interceptors);
+        
+    }
 
 }


Mime
View raw message