kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()
Date Thu, 11 Jan 2018 02:29:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321583#comment-16321583
] 

ASF GitHub Bot commented on KAFKA-6265:
---------------------------------------

ConcurrencyPractitioner closed pull request #4340: [KAFKA-6265] GlobalKTable missing #queryableStoreName()
URL: https://github.com/apache/kafka/pull/4340
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 5730f5330e1..20d01f2c148 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -2294,22 +2294,22 @@ <h4><a id="streams_developer-guide_interactive-queries_local-window-stores"
href
     </p>
 
     <pre class="brush: java;">
-          // Get the window store named "CountsWindowStore"
-          ReadOnlyWindowStore&lt;String, Long&gt; windowStore =
-              streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());
-
-          // Fetch values for the key "world" for all of the windows available in this application
instance.
-          // To get *all* available windows we fetch windows from the beginning of time until
now.
-          long timeFrom = 0; // beginning of time = oldest available
-          long timeTo = System.currentTimeMillis(); // now (in processing-time)
-          WindowStoreIterator&lt;Long&gt; iterator = windowStore.fetch("world", timeFrom,
timeTo);
-          while (iterator.hasNext()) {
+         // Get the window store named "CountsWindowStore"
+         ReadOnlyWindowStore&lt;String, Long&gt; windowStore =
+             streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());
+
+         // Fetch values for the key "world" for all of the windows available in this application
instance.
+         // To get *all* available windows we fetch windows from the beginning of time until
now.
+         long timeFrom = 0; // beginning of time = oldest available
+         long timeTo = System.currentTimeMillis(); // now (in processing-time)
+         WindowStoreIterator&lt;Long&gt; iterator = windowStore.fetch("world", timeFrom,
timeTo);
+         while (iterator.hasNext()) {
             KeyValue&lt;Long, Long&gt; next = iterator.next();
             long windowTimestamp = next.key;
             System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
-          }
-          iterator.close();
-        </pre>
+         }
+         iterator.close();
+    </pre>
 
     <h4><a id="streams_developer-guide_interactive-queries_custom-stores" href="#streams_developer-guide_interactive-queries_custom-stores">Querying
local custom state stores</a></h4>
     <p>
@@ -3023,4 +3023,4 @@ <h4><a id="streams_execute" href="#streams_execute">Executing
Your Kafka Streams
   // Display docs subnav items
   $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
 });
-</script>
\ No newline at end of file
+</script>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 297405802ca..cd5ad433159 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -44,6 +44,15 @@ <h1>Upgrade Guide &amp; API Changes</h1>
         See <a href="#streams_api_changes_0101">below</a> a complete list of
0.10.1 API changes that allow you to advance your application and/or simplify your code base,
including the usage of new features.
     </p>
 
+
+    <h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams
API changes in 1.1.0</a></h3>
+    <p>
+	New method in <code>GlobalKTable</code>
+    </p>
+    <ul>
+	<li> A method has been provided such that it will return the store name associated
with the <code>GlobalKTable</code> or <code>null</code> if the store
name is non-queryable. </li>
+    </ul>
+
     <h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams
API changes in 1.0.0</a></h3>
 
     <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 72286c20529..e58f67fc5b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -67,4 +67,10 @@
  */
 @InterfaceStability.Evolving
 public interface GlobalKTable<K, V> {
+    /**
+     * Get the name of the local state store that can be used to query this {@code GlobalKTable}.
+     *
+     * @return the underlying state store name, or {@code null} if this {@code GlobalKTable}
cannot be queried.
+     */
+    String queryableStoreName();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
index 34e23752444..8fcdfed1e52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
@@ -21,13 +21,29 @@
 public class GlobalKTableImpl<K, V> implements GlobalKTable<K, V> {
 
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
+    private final boolean queryable;
 
     public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier)
{
         this.valueGetterSupplier = valueGetterSupplier;
+        this.queryable = true;
+    }
+    
+    public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier,

+                            final boolean queryable) {
+        this.valueGetterSupplier = valueGetterSupplier;
+        this.queryable = queryable;
     }
 
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         return valueGetterSupplier;
     }
 
+    @Override
+    public String queryableStoreName() {
+        if (!queryable) {
+            return null;
+        }
+        return valueGetterSupplier.storeNames()[0];
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4308e5d0c50..2a8a89e1f0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -158,7 +158,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil
                                                topic,
                                                processorName,
                                                tableSource);
-        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
+        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()),
materialized.isQueryable());
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 05a02140c35..9526f60e2c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -156,6 +156,32 @@ public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable()
thro
         assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
         assertNull(table1.queryableStoreName());
     }
+    
+    @Test
+    public void shouldBuildGlobalTableWithNonQueryableStoreName() throws Exception {
+        final GlobalKTable<String, String> table1 = builder.globalTable(
+            "topic2",
+            consumed,
+            new MaterializedInternal<>(
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null,
null),
+                builder,
+                storePrefix));
+
+        assertNull(table1.queryableStoreName());
+    }
+
+    @Test
+    public void shouldBuildGlobalTableWithQueryaIbleStoreName() throws Exception {
+        final GlobalKTable<String, String> table1 = builder.globalTable(
+            "topic2",
+            consumed,
+            new MaterializedInternal<>(
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
+                builder,
+                storePrefix));
+
+        assertEquals("globalTable", table1.queryableStoreName());
+    }
 
     @Test
     public void shouldBuildSimpleGlobalTableTopology() throws Exception {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> GlobalKTable missing #queryableStoreName()
> ------------------------------------------
>
>                 Key: KAFKA-6265
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6265
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Antony Stubbs
>            Assignee: Richard Yu
>              Labels: beginner, needs-kip, newbie
>             Fix For: 1.1.0
>
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing from GlobalKTable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message