kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6386) Deprecate KafkaStreams constructor taking StreamsConfig parameter
Date Tue, 27 Mar 2018 23:12:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416444#comment-16416444
] 

ASF GitHub Bot commented on KAFKA-6386:
---------------------------------------

guozhangwang closed pull request #4354: KAFKA-6386:use Properties instead of StreamsConfig
in KafkaStreams constructor
URL: https://github.com/apache/kafka/pull/4354
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index baf9633a0c3..464854c57ad 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -86,7 +86,11 @@ <h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams
API
         to let users specify inner serdes if the default serde classes are windowed serdes.
         For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
     /<p>
-    
+
+     <p>
+	We have deprecated <code>StreamsConfig</code> in <code>KafkaStreams</code>
constructors. Now we only take in <code>java.util.Properties</code> since <code>StreamsConfig</code>
is immutable and is created from a Properties object itself.
+        For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>.
+    </p>
     <p>
       Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API
(<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
       To enable this new feature, <code>ProcessorContext#forward(...)</code>
was modified.
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 2ea5218d647..8a6ec05b4ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -26,7 +26,7 @@
 /**
  * {@code KafkaClientSupplier} can be used to provide custom Kafka clients to a {@link KafkaStreams}
instance.
  *
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig, KafkaClientSupplier)
+ * @see KafkaStreams#KafkaStreams(Topology, java.util.Properties, KafkaClientSupplier)
  */
 public interface KafkaClientSupplier {
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 1a70e4638ef..186276c22d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -515,31 +515,18 @@ public void onRestoreEnd(final TopicPartition topicPartition, final
String store
     }
 
     /**
-     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     * Create a {@code KafkaStreams} instance.
+     * <p>
+     * Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
+     * you still must {@link #close()} it to avoid resource leaks.
+     *
+     * @param topology the topology specifying the computational logic
+     * @param props    properties for {@link StreamsConfig}
+     * @throws StreamsException if any fatal error occurs
      */
-    @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
+    public KafkaStreams(final Topology topology,
                         final Properties props) {
-        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
-    }
-
-    /**
-     * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead
-     */
-    @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
-                        final StreamsConfig config) {
-        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
-    }
-
-    /**
-     * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier)}
instead
-     */
-    @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
-                        final StreamsConfig config,
-                        final KafkaClientSupplier clientSupplier) {
-        this(builder.internalTopologyBuilder, config, clientSupplier);
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
     }
 
     /**
@@ -548,13 +535,16 @@ public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder
bui
      * Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
      * you still must {@link #close()} it to avoid resource leaks.
      *
-     * @param topology the topology specifying the computational logic
-     * @param props   properties for {@link StreamsConfig}
+     * @param topology       the topology specifying the computational logic
+     * @param props          properties for {@link StreamsConfig}
+     * @param clientSupplier the Kafka clients supplier which provides underlying producer
and consumer clients
+     *                       for the new {@code KafkaStreams} instance
      * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
-                        final Properties props) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+                        final Properties props,
+                        final KafkaClientSupplier clientSupplier) {
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier,
Time.SYSTEM);
     }
 
     /**
@@ -563,13 +553,15 @@ public KafkaStreams(final Topology topology,
      * Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
      * you still must {@link #close()} it to avoid resource leaks.
      *
-     * @param topology the topology specifying the computational logic
-     * @param config  the Kafka Streams configuration
+     * @param topology       the topology specifying the computational logic
+     * @param props          properties for {@link StreamsConfig}
+     * @param time           {@code Time} implementation; cannot be null
      * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
-                        final StreamsConfig config) {
-        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+                        final Properties props,
+                        final Time time) {
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier(),
time);
     }
 
     /**
@@ -579,11 +571,60 @@ public KafkaStreams(final Topology topology,
      * you still must {@link #close()} it to avoid resource leaks.
      *
      * @param topology       the topology specifying the computational logic
-     * @param config         the Kafka Streams configuration
+     * @param props          properties for {@link StreamsConfig}
      * @param clientSupplier the Kafka clients supplier which provides underlying producer
and consumer clients
      *                       for the new {@code KafkaStreams} instance
+     * @param time           {@code Time} implementation; cannot be null
      * @throws StreamsException if any fatal error occurs
      */
+    public KafkaStreams(final Topology topology,
+                        final Properties props,
+                        final KafkaClientSupplier clientSupplier,
+                        final Time time) {
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier,
time);
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
+                        final Properties props) {
+        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
+                        final StreamsConfig config) {
+        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties, KafkaClientSupplier)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
+                        final StreamsConfig config,
+                        final KafkaClientSupplier clientSupplier) {
+        this(builder.internalTopologyBuilder, config, clientSupplier);
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final Topology topology,
+                        final StreamsConfig config) {
+        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties, KafkaClientSupplier)} instead
+     */
+    @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
@@ -591,13 +632,9 @@ public KafkaStreams(final Topology topology,
     }
 
     /**
-     * Create a {@code KafkaStreams} instance.
-     *
-     * @param topology       the topology specifying the computational logic
-     * @param config         the Kafka Streams configuration
-     * @param time           {@code Time} implementation; cannot be null
-     * @throws StreamsException if any fatal error occurs
+     * @deprecated use {@link #KafkaStreams(Topology, Properties, Time)} instead
      */
+    @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final Time time) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index ecfcad80e81..0eaf9ee127d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -123,7 +123,7 @@
  * </ul>
  *
  *
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig)
+ * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, Properties)
  * @see ConsumerConfig
  * @see ProducerConfig
  */
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 9770173f994..1dc8602c280 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -130,7 +130,7 @@ public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws
Exception
             Collections.<String>emptySet(), nodes.get(0));
         MockClientSupplier clientSupplier = new MockClientSupplier();
         clientSupplier.setClusterForAdminClient(cluster);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(props),
clientSupplier);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, clientSupplier);
         streams.close();
         TestUtils.waitForCondition(new TestCondition() {
             @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 16d2611a8a3..80ab60647ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -193,7 +193,7 @@ void runTest(final List<List<String>> expectedResult, final
String storeName) th
         assert expectedResult.size() == input.size();
 
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
-        streams = new KafkaStreams(builder.build(), new StreamsConfig(STREAMS_CONFIG));
+        streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
 
         String expectedFinalResult = null;
 
@@ -234,7 +234,7 @@ void runTest(final String expectedFinalResult) throws Exception {
      */
     void runTest(final String expectedFinalResult, final String storeName) throws Exception
{
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
-        streams = new KafkaStreams(builder.build(), new StreamsConfig(STREAMS_CONFIG));
+        streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
 
         try {
             streams.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 4b983cf2da0..9dfb6dda37e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -166,7 +166,7 @@ public void setup() {
                .groupBy(MockMapper.selectKeyKeyValueMapper())
                .count();
 
-        kafkaStreams = new KafkaStreams(builder.build(), new StreamsConfig(streamsConfiguration),
time);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, time);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 1da4c580104..57ed161b084 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -145,7 +145,7 @@ public void testRegexMatchesTopicsAWhenCreated() throws Exception {
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         final List<String> assignedTopics = new ArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier()
{
+        streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
                 return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer()) {
@@ -185,8 +185,6 @@ public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
         final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A",
"TEST-TOPIC-B");
         final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
 
-        final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
-
         CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -196,7 +194,7 @@ public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
         final List<String> assignedTopics = new ArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier()
{
+        streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
                 return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer()) {
@@ -333,9 +331,8 @@ public void testMultipleConsumersCanReadFromPartitionedTopic() throws
Exception
 
             final List<String> leaderAssignment = new ArrayList<>();
             final List<String> followerAssignment = new ArrayList<>();
-            StreamsConfig config = new StreamsConfig(streamsConfiguration);
 
-            partitionedStreamsLeader  = new KafkaStreams(builderLeader.build(), config, new
DefaultKafkaClientSupplier() {
+            partitionedStreamsLeader  = new KafkaStreams(builderLeader.build(), streamsConfiguration,
new DefaultKafkaClientSupplier() {
                 @Override
                 public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
                     return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer()) {
@@ -347,7 +344,7 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener
list
 
                 }
             });
-            partitionedStreamsFollower  = new KafkaStreams(builderFollower.build(), config,
new DefaultKafkaClientSupplier() {
+            partitionedStreamsFollower  = new KafkaStreams(builderFollower.build(), streamsConfiguration,
new DefaultKafkaClientSupplier() {
                 @Override
                 public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
                     return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer()) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Deprecate KafkaStreams constructor taking StreamsConfig parameter
> -----------------------------------------------------------------
>
>                 Key: KAFKA-6386
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6386
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Boyang Chen
>            Priority: Minor
>              Labels: beginner, kip, newbie
>             Fix For: 1.2.0
>
>
> Currently, {{KafkaStreams}} constructor has overloads that take either {{Properties}}
or {{StreamsConfig}} a parameters.
> Because {{StreamsConfig}} is immutable and is created from a {{Properties}} object itself,
the constructors accepting {{StreamsConfig}} are not useful and adds only boiler plate code.
Thus, we should deprecate those constructors in order to remove them eventually.
> KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message