kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: some public JavaDoc cleanup
Date Mon, 09 Jan 2017 23:17:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 52e397962 -> 64514ff5e


MINOR: some public JavaDoc cleanup

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #2332 from mjsax/javaDocImprovements5


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64514ff5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64514ff5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64514ff5

Branch: refs/heads/trunk
Commit: 64514ff5ec2ec9616dd7b11308d8da4f095edb37
Parents: 52e3979
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Jan 9 15:17:23 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jan 9 15:17:23 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/Aggregator.java       |  4 +-
 .../kafka/streams/kstream/ForeachAction.java    |  2 +-
 .../kafka/streams/kstream/Initializer.java      |  2 +
 .../kafka/streams/kstream/KGroupedStream.java   | 44 +++++++++++---------
 .../kafka/streams/kstream/KeyValueMapper.java   |  4 +-
 .../apache/kafka/streams/kstream/Merger.java    |  4 +-
 .../apache/kafka/streams/kstream/Predicate.java |  2 +-
 .../apache/kafka/streams/kstream/Reducer.java   |  4 +-
 .../kafka/streams/kstream/Transformer.java      |  6 +--
 .../kafka/streams/kstream/ValueJoiner.java      |  2 +-
 .../kafka/streams/kstream/ValueMapper.java      |  2 +-
 .../kafka/streams/kstream/ValueTransformer.java |  6 +--
 12 files changed, 46 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index 9afce57..e433ea7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)
  * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde,
String)
  * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde,
String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde,
org.apache.kafka.streams.processor.StateStoreSupplier)
  * @see Reducer
  */
 @InterfaceStability.Unstable
@@ -48,5 +50,5 @@ public interface Aggregator<K, V, VA> {
      * @param aggregate the current aggregate value
      * @return the new aggregate value
      */
-    VA apply(K key, V value, VA aggregate);
+    VA apply(final K key, final V value, final VA aggregate);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
index 59f6fab..e68bf8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -40,7 +40,7 @@ public interface ForeachAction<K, V> {
      * @param key   the key of the record
      * @param value the value of the record
      */
-    void apply(K key, V value);
+    void apply(final K key, final V value);
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
index bee598a..96a6995 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * @see KGroupedStream#aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)
  * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde,
String)
  * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde,
String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde,
org.apache.kafka.streams.processor.StateStoreSupplier)
  */
 @InterfaceStability.Unstable
 public interface Initializer<VA> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index fc2881a..c3e2f1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -263,7 +263,7 @@ public interface KGroupedStream<K, V> {
      * aggregate and the record's value.
      * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions
like sum, min, or max.
+     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like
sum, min, or max.
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
@@ -303,7 +303,8 @@ public interface KGroupedStream<K, V> {
      * aggregate and the record's value.
      * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions
like sum, min, or max.
+     * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate
functions like sum, min, or
+     * max.
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
@@ -349,7 +350,7 @@ public interface KGroupedStream<K, V> {
      * aggregate and the record's value.
      * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions
like sum, min, or max.
+     * Thus, {@code reduce(Reducer, Windows, String)} can be used to compute aggregate functions
like sum, min, or max.
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
@@ -397,7 +398,8 @@ public interface KGroupedStream<K, V> {
      * aggregate and the record's value.
      * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions
like sum, min, or max.
+     * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute
aggregate functions like sum,
+     * min, or max.
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
@@ -445,7 +447,8 @@ public interface KGroupedStream<K, V> {
      * aggregate and the record's value.
      * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions
like sum, min, or max.
+     * Thus, {@code reduce(Reducer, SessionWindows, String)} can be used to compute aggregate
functions like sum, min,
+     * or max.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
@@ -495,7 +498,8 @@ public interface KGroupedStream<K, V> {
      * aggregate and the record's value.
      * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions
like sum, min, or max.
+     * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute
aggregate functions like
+     * sum, min, or max.
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
@@ -547,13 +551,13 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result
provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, Serde, String)} can be used to compute
aggregate functions like
-     * count (c.f. {@link #count(String)}) TODO add more examples.
+     * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute
aggregate functions like
+     * count (c.f. {@link #count(String)})
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
-     * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+     * KafkaStreams streams = ... // some aggregation on value type double
      * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
      * String key = "some-key";
      * Long aggForKey = localStore.get(key); // key must be local (application state is shared
over all running Kafka Streams instances)
@@ -595,14 +599,14 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result
provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used
to compute aggregate functions
-     * like count (c.f. {@link #count(String)}) TODO add more examples.
+     * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to
compute aggregate functions
+     * like count (c.f. {@link #count(String)})
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
-     * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+     * KafkaStreams streams = ... // some aggregation on value type double
      * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
      * String key = "some-key";
      * Long aggForKey = localStore.get(key); // key must be local (application state is shared
over all running Kafka Streams instances)
@@ -646,13 +650,13 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result
provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used
to compute aggregate
-     * functions like count (c.f. {@link #count(String)}) TODO add more examples.
+     * Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used
to compute aggregate
+     * functions like count (c.f. {@link #count(String)})
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
-     * KafkaStreams streams = ... // some windowed aggregation on value type double TODO
update example
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
      * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName,
QueryableStoreTypes.<String, Long>windowStore());
      * String key = "some-key";
      * long fromTime = ...;
@@ -703,7 +707,7 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result
provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used
to compute aggregate
+     * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can
be used to compute aggregate
      * functions like count (c.f. {@link #count(String)}) TODO add more examples.
      * <p>
      * To query the local windowed {@link KeyValueStore} it must be obtained via
@@ -750,8 +754,8 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result
provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}
can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)})
+     * Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}
can be used to compute
+     * aggregate functions like count (c.f. {@link #count(String)})
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
@@ -800,8 +804,8 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result
provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}
can be used to compute aggregate
-     * functions like count (c.f. {@link #count(String)})
+     * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)}
can be used
+     * to compute aggregate functions like count (c.f. {@link #count(String)})
      * <p>
      * To query the local {@link SessionStore} it must be obtained via
      * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index 6fcf61c..d6d1def 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -41,7 +41,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * @see KStream#groupBy(KeyValueMapper)
  * @see KStream#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
  * @see KTable#groupBy(KeyValueMapper)
- * @see KTable#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serdes, org.apache.kafka.common.serialization.Serde)
+ * @see KTable#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
  * @see KTable#toStream(KeyValueMapper)
  */
 @InterfaceStability.Unstable
@@ -54,5 +54,5 @@ public interface KeyValueMapper<K, V, VR> {
      * @param value the value of the record
      * @return the new value
      */
-    VR apply(K key, V value);
+    VR apply(final K key, final V value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
index fcdb20f..5a70f21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * The interface for merging aggregate values for {@link SessionWindows} with the given key.
  *
  * @param <K>   key type
- * @param <T>   aggregate value type
+ * @param <V>   aggregate value type
  */
 @InterfaceStability.Unstable
 public interface Merger<K, V> {
@@ -36,5 +36,5 @@ public interface Merger<K, V> {
      * @param aggTwo    the second aggregate
      * @return          the new aggregate value
      */
-    V apply(K aggKey, V aggOne, V aggTwo);
+    V apply(final K aggKey, final V aggOne, final V aggTwo);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
index b46e60f..24563ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -43,5 +43,5 @@ public interface Predicate<K, V> {
      * @return {@code true} if the {@link org.apache.kafka.streams.KeyValue key-value pair}
satisfies the
      * predicate&mdash;{@code false} otherwise
      */
-    boolean test(K key, V value);
+    boolean test(final K key, final V value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
index 5791d9c..d19c1eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
  * @see KGroupedStream#reduce(Reducer, Windows, String)
  * @see KGroupedStream#reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
+ * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
  * @see Aggregator
  */
 @InterfaceStability.Unstable
@@ -45,5 +47,5 @@ public interface Reducer<V> {
      * @param value2 the second value for the aggregation
      * @return the aggregated value
      */
-    V apply(V value1, V value2);
+    V apply(final V value1, final V value2);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index e9363e8..95d822a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -37,7 +37,7 @@ public interface Transformer<K, V, R> {
      *
      * @param context the context; may not be null
      */
-    void init(ProcessorContext context);
+    void init(final ProcessorContext context);
 
     /**
      * Transform the record with the given key and value.
@@ -46,7 +46,7 @@ public interface Transformer<K, V, R> {
      * @param value the value for the record
      * @return new value; if null no key-value pair will be forwarded to down stream
      */
-    R transform(K key, V value);
+    R transform(final K key, final V value);
 
     /**
      * Perform any periodic operations and possibly generate a key, if this processor {@link
ProcessorContext#schedule(long) schedules itself} with the context
@@ -55,7 +55,7 @@ public interface Transformer<K, V, R> {
      * @param timestamp the stream time when this method is being called
      * @return new value; if null it will not be forwarded to down stream
      */
-    R punctuate(long timestamp);
+    R punctuate(final long timestamp);
 
     /**
      * Close this processor and clean up any resources.

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
index e51889b..ab91bb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -51,5 +51,5 @@ public interface ValueJoiner<V1, V2, VR> {
      * @param value2 the second value for joining
      * @return the joined value
      */
-    VR apply(V1 value1, V2 value2);
+    VR apply(final V1 value1, final V2 value2);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index 7d1a096..5099ac7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -42,5 +42,5 @@ public interface ValueMapper<V, VR> {
      * @param value the value to be mapped
      * @return the new value
      */
-    VR apply(V value);
+    VR apply(final V value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64514ff5/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index f92d9a1..063c352 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -36,7 +36,7 @@ public interface ValueTransformer<V, R> {
      *
      * @param context the context; may not be null
      */
-    void init(ProcessorContext context);
+    void init(final ProcessorContext context);
 
     /**
      * Transform the record with the given key and value.
@@ -44,7 +44,7 @@ public interface ValueTransformer<V, R> {
      * @param value the value for the record
      * @return new value
      */
-    R transform(V value);
+    R transform(final V value);
 
     /**
      * Perform any periodic operations and possibly return a new value, if this processor
{@link ProcessorContext#schedule(long) schedule itself} with the context
@@ -53,7 +53,7 @@ public interface ValueTransformer<V, R> {
      * @param timestamp the stream time when this method is being called
      * @return new value; if null it will not be forwarded to down stream
      */
-    R punctuate(long timestamp);
+    R punctuate(final long timestamp);
 
     /**
      * Close this processor and clean up any resources.


Mime
View raw message