spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From koenin...@apache.org
Subject spark git commit: [SPARK-21168] KafkaRDD should always set kafka clientId.
Date Mon, 23 Apr 2018 18:56:34 GMT
Repository: spark
Updated Branches:
  refs/heads/master 293a0f29e -> 448d248f8


[SPARK-21168] KafkaRDD should always set kafka clientId.

[https://issues.apache.org/jira/browse/SPARK-21168](https://issues.apache.org/jira/browse/SPARK-21168)
There are no a number of other places that a client ID should be set,and I think we should
use consumer.clientId in the clientId method,because the fetch request  will be used by the
same consumer behind.

Author: liuzhaokun <liu.zhaokun@zte.com.cn>

Closes #19887 from liu-zhaokun/master1205.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/448d248f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/448d248f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/448d248f

Branch: refs/heads/master
Commit: 448d248f897fa39cfc82d71a3d6b67e6470f8a02
Parents: 293a0f2
Author: liuzhaokun <liu.zhaokun@zte.com.cn>
Authored: Mon Apr 23 13:56:11 2018 -0500
Committer: cody koeninger <cody@koeninger.org>
Committed: Mon Apr 23 13:56:11 2018 -0500

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/448d248f/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 5ea52b6..791cf0e 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -191,6 +191,7 @@ class KafkaRDD[
 
     private def fetchBatch: Iterator[MessageAndOffset] = {
       val req = new FetchRequestBuilder()
+        .clientId(consumer.clientId)
         .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
         .build()
       val resp = consumer.fetch(req)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message