kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6354: Update KStream JavaDoc using new State Store API (#4456)
Date Fri, 02 Feb 2018 19:24:24 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f6bbec4  KAFKA-6354: Update KStream JavaDoc using new State Store API (#4456)
f6bbec4 is described below

commit f6bbec4af2f1842ea7be35899e3c7d60179ab2fb
Author: Yu <yu.liu003@gmail.com>
AuthorDate: Fri Feb 2 20:24:20 2018 +0100

    KAFKA-6354: Update KStream JavaDoc using new State Store API (#4456)
    
    Updates KStream JavaDoc and web page documentations using new State Store API
    
    Author: Yu Liu <yu.liu003@gmail.com>
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 docs/streams/developer-guide/processor-api.html    | 44 +++++++++-------------
 .../kafka/streams/kstream/KGroupedStream.java      |  8 ++--
 .../streams/kstream/SessionWindowedKStream.java    |  2 +-
 .../state/internals/InnerMeteredKeyValueStore.java |  2 +-
 4 files changed, 24 insertions(+), 32 deletions(-)

diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index b92e85b..ecc5388 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -209,22 +209,18 @@
                         </ul>
                             <div class="highlight-java"><div class="highlight"><pre><span></span><span
class="c1">// Creating a persistent key-value store:</span>
 <span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` named
&quot;persistent-counts&quot;.</span>
-<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.processor.StateStoreSupplier</span><span
class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span
class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span
class="o">;</span>
 
-<span class="c1">// Note: The `Stores` factory returns a supplier for the state store,</span>
-<span class="c1">// because that&#39;s what you typically need to pass as API parameter.</span>
-<span class="n">StateStoreSupplier</span> <span class="n">countStoreSupplier</span>
<span class="o">=</span>
-  <span class="n">Stores</span><span class="o">.</span><span class="na">create</span><span
class="o">(</span><span class="s">&quot;persistent-counts&quot;</span><span
class="o">)</span>
-    <span class="o">.</span><span class="na">withKeys</span><span
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">())</span>
-    <span class="o">.</span><span class="na">withValues</span><span
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">())</span>
-    <span class="o">.</span><span class="na">persistent</span><span
class="o">()</span>
-    <span class="o">.</span><span class="na">build</span><span
class="o">();</span>
+<span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span>
+<span class="n">StoreBuilder</span><span class="o">&lt;</span><span
class="n">KeyValueStore</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">Long</span><span
class="o">&gt;&gt;</span> <span class="n">countStoreSupplier</span>
<span class="o">=</span>
+  <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span
class="o">(</span>
+    <span class="n">Stores</span><span class="o">.</span><span
class="na">persistentKeyValueStore</span><span class="o">(</span><span
class="s">&quot;persistent-counts&quot;</span><span class="o">),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">(),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">());</span>
+<span class="n">KeyValueStore</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">Long</span><span
class="o">&gt;</span> <span class="n">countStore</span> <span
class="o">=</span> <span class="n">countStoreSupplier</span><span
class="o">.</span><span class="na">build</span><span class="o">();</span>
 </pre></div>
                             </div>
-                            <p class="last">See
-                                <a class="reference external" href="../javadocs/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">PersistentKeyValueFactory</a>
for
-                                detailed factory options.</p>
                         </td>
                     </tr>
                     <tr class="row-odd"><td>In-memory
@@ -242,22 +238,18 @@
                         </ul>
                             <div class="highlight-java"><div class="highlight"><pre><span></span><span
class="c1">// Creating an in-memory key-value store:</span>
 <span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` named
&quot;inmemory-counts&quot;.</span>
-<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.processor.StateStoreSupplier</span><span
class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.StoreBuilder</span><span
class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.streams.state.Stores</span><span
class="o">;</span>
 
-<span class="c1">// Note: The `Stores` factory returns a supplier for the state store,</span>
-<span class="c1">// because that&#39;s what you typically need to pass as API parameter.</span>
-<span class="n">StateStoreSupplier</span> <span class="n">countStoreSupplier</span>
<span class="o">=</span>
-  <span class="n">Stores</span><span class="o">.</span><span class="na">create</span><span
class="o">(</span><span class="s">&quot;inmemory-counts&quot;</span><span
class="o">)</span>
-    <span class="o">.</span><span class="na">withKeys</span><span
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">())</span>
-    <span class="o">.</span><span class="na">withValues</span><span
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">())</span>
-    <span class="o">.</span><span class="na">inMemory</span><span
class="o">()</span>
-    <span class="o">.</span><span class="na">build</span><span
class="o">();</span>
+<span class="c1">// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.</span>
+<span class="n">StoreBuilder</span><span class="o">&lt;</span><span
class="n">KeyValueStore</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">Long</span><span
class="o">&gt;&gt;</span> <span class="n">countStoreSupplier</span>
<span class="o">=</span>
+  <span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span
class="o">(</span>
+    <span class="n">Stores</span><span class="o">.</span><span
class="na">inMemoryKeyValueStore</span><span class="o">(</span><span
class="s">&quot;inmemory-counts&quot;</span><span class="o">),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">(),</span>
+    <span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">());</span>
+<span class="n">KeyValueStore</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">Long</span><span
class="o">&gt;</span> <span class="n">countStore</span> <span
class="o">=</span> <span class="n">countStoreSupplier</span><span
class="o">.</span><span class="na">build</span><span class="o">();</span>
 </pre></div>
                             </div>
-                            <p class="last">See
-                                <a class="reference external" href="../javadocs/org/apache/kafka/streams/state/Stores.InMemoryKeyValueFactory.html">InMemoryKeyValueFactory</a>
for
-                                detailed factory options.</p>
                         </td>
                     </tr>
                     </tbody>
@@ -332,8 +324,8 @@
                     The primary interface to implement for the store is
                     <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code>.
 Kafka Streams also has a few extended interfaces such
                     as <code class="docutils literal"><span class="pre">KeyValueStore</span></code>.</p>
-                <p>You also need to provide a &#8220;factory&#8221; for the
store by implementing the
-                    <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStoreSupplier</span></code>
interface, which Kafka Streams uses to create instances of
+                <p>You also need to provide a &#8220;builder&#8221; for the
store by implementing the
+                    <code class="docutils literal"><span class="pre">org.apache.kafka.streams.state.StoreBuilder</span></code>
interface, which Kafka Streams uses to create instances of
                     your store.</p>
             </div>
         </div>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 6347960..29de64c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -131,7 +131,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * To query the local {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
-     * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the
store name:
+     * Use {@link org.apache.kafka.streams.processor.StateStore#name()} to get the store
name:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
      * String queryableStoreName = storeSupplier.name();
@@ -154,7 +154,7 @@ public interface KGroupedStream<K, V> {
      * Count the number of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
-     * provided by the given {@code storeSupplier}.
+     * provided by the given {@code materialized}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
@@ -576,7 +576,7 @@ public interface KGroupedStream<K, V> {
      * Combining implies that the type of the aggregate result is the same as the type of
the input value
      * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
-     * provided by the given {@code storeSupplier}.
+     * provided by the given {@code materialized}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
      * <p>
      * The specified {@link Reducer} is applied for each input record and computes a new
aggregate using the current
@@ -592,7 +592,7 @@ public interface KGroupedStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)}
can be used to compute aggregate functions like sum, min, or
+     * Thus, {@code reduce(Reducer, Materialized)} can be used to compute aggregate functions
like sum, min, or
      * max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index ddc0371..efc60ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -230,7 +230,7 @@ public interface SessionWindowedKStream<K, V> {
      * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute
aggregate functions like
+     * Thus, {@code reduce(Reducer, Materialized)} can be used to compute aggregate functions
like
      * sum, min, or max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache will be used to deduplicate
consecutive updates to
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index a34851a..d076a05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -55,7 +55,7 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
 
     /**
      * For a period of time we will have 2 store hierarchies. 1 which is built by a
-     * {@link org.apache.kafka.streams.processor.StateStoreSupplier} where the outer most
store will be of user defined
+     * {@link org.apache.kafka.streams.state.StoreSupplier} where the outer most store will
be of user defined
      * type, i.e, &lt;String,Integer&gt;, and another where the outermost store will
be of type &lt;Bytes,byte[]&gt;
      * This interface is so we don't need to have 2 complete implementations for collecting
the metrics, rather
      * we just provide an instance of this to do the type conversions from the outer store
types to the inner store types.

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message