kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5442: Streams producer client.id are not unique for EOS
Date Thu, 15 Jun 2017 18:05:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 3e99db845 -> 46e9869af


KAFKA-5442: Streams producer client.id are not unique for EOS

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3329 from mjsax/kafka-5442-producer-id-conflict

(cherry picked from commit 1d556e471411b3ffa5bf65afd08d1a78ee40e710)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46e9869a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46e9869a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46e9869a

Branch: refs/heads/0.11.0
Commit: 46e9869afe1cfbf1b2509b1676dcae6ce59ea55d
Parents: 3e99db8
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Jun 15 11:05:12 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 15 11:05:24 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/StreamThread.java    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/46e9869a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 372f314..ef3d44a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1250,15 +1250,16 @@ public class StreamThread extends Thread {
     }
 
     private Producer<byte[], byte[]> createProducer(final TaskId id) {
-        final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
 
         final Producer<byte[], byte[]> producer;
         if (eosEnabled) {
+            final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId
+ "-" + id);
             log.info("{} Creating producer client for task {}", logPrefix, id);
             producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-"
+ id);
             producer = clientSupplier.getProducer(producerConfigs);
         } else {
             if (threadProducer == null) {
+                final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
                 log.info("{} Creating shared producer client", logPrefix);
                 threadProducer = clientSupplier.getProducer(producerConfigs);
             }


Mime
View raw message