kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7781; Add validation check for retention.ms topic property.
Date Sun, 13 Jan 2019 13:15:12 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cb3eedc  KAFKA-7781; Add validation check for retention.ms topic property.
cb3eedc is described below

commit cb3eedcf9414244b3e75e0e1d91ac149a573d8b8
Author: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
AuthorDate: Sun Jan 13 18:44:29 2019 +0530

    KAFKA-7781; Add validation check for retention.ms topic property.
    
    Using AdminClient#alterConfigs, topic `retention.ms` property can be assigned to a value
lesser than -1. This leads to inconsistency while describing the topic configuration. We should
not allow values lesser than -1.
    
    Author: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>,Matthias J. Sax <matthias@confluent.io>
    
    Closes #6082 from kamalcph/KAFKA-7781
---
 core/src/main/scala/kafka/log/LogConfig.scala                  |  2 +-
 .../src/main/java/org/apache/kafka/streams/StreamsBuilder.java |  4 ++--
 .../main/java/org/apache/kafka/streams/kstream/KStream.java    | 10 +++++-----
 .../kstream/internals/suppress/EagerBufferConfigImpl.java      |  2 +-
 .../kafka/streams/processor/internals/InternalTopicConfig.java |  2 +-
 .../streams/processor/internals/RepartitionTopicConfig.java    |  2 +-
 .../processor/internals/UnwindowedChangelogTopicConfig.java    |  2 +-
 .../processor/internals/WindowedChangelogTopicConfig.java      | 10 ++++++++--
 8 files changed, 20 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index d872e09..ab58949 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -232,7 +232,7 @@ object LogConfig {
       .define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc,
         KafkaConfig.LogRetentionBytesProp)
       // can be negative. See kafka.log.LogManager.cleanupExpiredSegments
-      .define(RetentionMsProp, LONG, Defaults.RetentionMs, MEDIUM, RetentionMsDoc,
+      .define(RetentionMsProp, LONG, Defaults.RetentionMs, atLeast(-1), MEDIUM, RetentionMsDoc,
         KafkaConfig.LogRetentionTimeMillisProp)
       .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc,
         KafkaConfig.MessageMaxBytesProp)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 442c87c..fa8d05d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -199,7 +199,7 @@ public class StreamsBuilder {
      * You should only specify serdes in the {@link Consumed} instance as these will also
be used to overwrite the
      * serdes in {@link Materialized}, i.e.,
      * <pre> {@code
-     * streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String,
String, KeyValueStore<Bytes, byte[]>as(storeName))
+     * streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String,
String, KeyValueStore<Bytes, byte[]>as(storeName))
      * }
      * </pre>
      * To query the local {@link KeyValueStore} it must be obtained via
@@ -372,7 +372,7 @@ public class StreamsBuilder {
      * You should only specify serdes in the {@link Consumed} instance as these will also
be used to overwrite the
      * serdes in {@link Materialized}, i.e.,
      * <pre> {@code
-     * streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String,
String, KeyValueStore<Bytes, byte[]>as(storeName))
+     * streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String,
String, KeyValueStore<Bytes, byte[]>as(storeName))
      * }
      * </pre>
      * To query the local {@link KeyValueStore} it must be obtained via
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index b961709..55bde9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -64,7 +64,7 @@ public interface KStream<K, V> {
      * @return a {@code KStream} that contains only those records that satisfy the given
predicate
      * @see #filterNot(Predicate)
      */
-    KStream<K, V> filter(Predicate<? super K, ? super V> predicate);
+    KStream<K, V> filter(final Predicate<? super K, ? super V> predicate);
 
     /**
      * Create a new {@code KStream} that consists all records of this stream which do <em>not</em>
satisfy the given
@@ -76,7 +76,7 @@ public interface KStream<K, V> {
      * @return a {@code KStream} that contains only those records that do <em>not</em>
satisfy the given predicate
      * @see #filter(Predicate)
      */
-    KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate);
+    KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
 
     /**
      * Set a new key (with possibly new type) for each input record.
@@ -109,7 +109,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      */
-    <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends
KR> mapper);
+    <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super
V, ? extends KR> mapper);
 
     /**
      * Transform each record of the input stream into a new record in the output stream (both
key and value type can be
@@ -148,7 +148,7 @@ public interface KStream<K, V> {
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      */
-    <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends
KeyValue<? extends KR, ? extends VR>> mapper);
+    <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V,
? extends KeyValue<? extends KR, ? extends VR>> mapper);
 
     /**
      * Transform the value of each input record into a new value (with possible new type)
of the output record.
@@ -183,7 +183,7 @@ public interface KStream<K, V> {
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      */
-    <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
mapper);
+    <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR>
mapper);
 
     /**
      * Transform the value of each input record into a new value (with possible new type)
of the output record.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index 90456d9..e94abc1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -75,6 +75,6 @@ public class EagerBufferConfigImpl extends BufferConfigInternal {
 
     @Override
     public String toString() {
-        return "EagerBufferConfigImpl{maxKeys=" + maxRecords + ", maxBytes=" + maxBytes +
'}';
+        return "EagerBufferConfigImpl{maxRecords=" + maxRecords + ", maxBytes=" + maxBytes
+ '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index c9a3344..aa565e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -40,7 +40,7 @@ public abstract class InternalTopicConfig {
     }
 
     /**
-     * Get the configured properties for this topic. If rententionMs is set then
+     * Get the configured properties for this topic. If retentionMs is set then
      * we add additionalRetentionMs to work out the desired retention when cleanup.policy=compact,delete
      *
      * @param additionalRetentionMs - added to retention to allow for clock drift etc
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
index a7a8d88..466520e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
@@ -45,7 +45,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
     }
 
     /**
-     * Get the configured properties for this topic. If rententionMs is set then
+     * Get the configured properties for this topic. If retentionMs is set then
      * we add additionalRetentionMs to work out the desired retention when cleanup.policy=compact,delete
      *
      * @param additionalRetentionMs - added to retention to allow for clock drift etc
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
index e55ce71..acca837 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
@@ -40,7 +40,7 @@ public class UnwindowedChangelogTopicConfig extends InternalTopicConfig
{
     }
 
     /**
-     * Get the configured properties for this topic. If rententionMs is set then
+     * Get the configured properties for this topic. If retentionMs is set then
      * we add additionalRetentionMs to work out the desired retention when cleanup.policy=compact,delete
      *
      * @param additionalRetentionMs - added to retention to allow for clock drift etc
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
index e177bea..55d548c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
@@ -42,7 +42,7 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig {
     }
 
     /**
-     * Get the configured properties for this topic. If rententionMs is set then
+     * Get the configured properties for this topic. If retentionMs is set then
      * we add additionalRetentionMs to work out the desired retention when cleanup.policy=compact,delete
      *
      * @param additionalRetentionMs - added to retention to allow for clock drift etc
@@ -57,7 +57,13 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig {
         topicConfig.putAll(topicConfigs);
 
         if (retentionMs != null) {
-            topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs +
additionalRetentionMs));
+            long retentionValue;
+            try {
+                retentionValue = Math.addExact(retentionMs, additionalRetentionMs);
+            } catch (final ArithmeticException swallow) {
+                retentionValue = Long.MAX_VALUE;
+            }
+            topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionValue));
         }
 
         return topicConfig;


Mime
View raw message