Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BA0E2200CA6 for ; Tue, 13 Jun 2017 14:14:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B87C2160BDC; Tue, 13 Jun 2017 12:14:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D88B4160BC9 for ; Tue, 13 Jun 2017 14:14:33 +0200 (CEST) Received: (qmail 78224 invoked by uid 500); 13 Jun 2017 12:14:33 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 78215 invoked by uid 99); 13 Jun 2017 12:14:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Jun 2017 12:14:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EBB23DFAF1; Tue, 13 Jun 2017 12:14:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: <5988434a68ef45bb85d23cb5928aeeed@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Avoid calling interceptors.onSendError() with null TopicPartition Date: Tue, 13 Jun 2017 12:14:32 +0000 (UTC) archived-at: Tue, 13 Jun 2017 12:14:34 -0000 Repository: kafka Updated Branches: refs/heads/0.11.0 ad74035d5 -> d98993bc1 MINOR: Avoid calling interceptors.onSendError() with null TopicPartition Assign non-null tp as soon as possible once we know the partition. This is so that if ensureValidRecordSize() throws, the interceptors.onSendError() call is made with a non-null tp. Author: Tom Bentley Reviewers: Ismael Juma Closes #3280 from tombentley/tp-assign (cherry picked from commit 4660bac4eb45a65767d05b03d275e3bf08a85c2a) Signed-off-by: Ismael Juma Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d98993bc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d98993bc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d98993bc Branch: refs/heads/0.11.0 Commit: d98993bc1583f37492d1d7fda094f792c4d8c2c2 Parents: ad74035 Author: Tom Bentley Authored: Tue Jun 13 11:39:17 2017 +0100 Committer: Ismael Juma Committed: Tue Jun 13 13:14:28 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../clients/producer/KafkaProducerTest.java | 38 ++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d98993bc/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 4f155a4..9e84a31 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -630,6 +630,7 @@ public class KafkaProducer implements Producer { " specified in value.serializer"); } int partition = partition(record, serializedKey, serializedValue, cluster); + tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); @@ -637,7 +638,6 @@ public class KafkaProducer implements Producer { int serializedSize = AbstractRecords.sizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); - tp = new TopicPartition(record.topic(), partition); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback http://git-wip-us.apache.org/repos/asf/kafka/blob/d98993bc/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index e2fe614..56c1b18 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -18,10 +18,12 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; @@ -434,5 +436,41 @@ public class KafkaProducerTest { assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel()); } } + + @PrepareOnlyThisForTest(Metadata.class) + @Test + public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1"); + String topic = "topic"; + ProducerRecord record = new ProducerRecord<>(topic, "value"); + + KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), + new StringSerializer()); + Metadata metadata = PowerMock.createNiceMock(Metadata.class); + MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); + final Cluster cluster = new Cluster( + "dummy", + Collections.singletonList(new Node(0, "host1", 1000)), + Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), + Collections.emptySet(), + Collections.emptySet()); + EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); + + // Mock interceptors field + ProducerInterceptors interceptors = PowerMock.createMock(ProducerInterceptors.class); + EasyMock.expect(interceptors.onSend(record)).andReturn(record); + interceptors.onSendError(EasyMock.eq(record), EasyMock.notNull(), EasyMock.notNull()); + EasyMock.expectLastCall(); + MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors); + + PowerMock.replay(metadata); + EasyMock.replay(interceptors); + producer.send(record); + + EasyMock.verify(interceptors); + + } }