From jira-return-8744-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Jan 11 18:57:04 2018
Return-Path:
X-Original-To: archive-asf-public@eu.ponee.io
Delivered-To: archive-asf-public@eu.ponee.io
Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183])
by mx-eu-01.ponee.io (Postfix) with ESMTP id A436618076D
for ; Thu, 11 Jan 2018 18:57:04 +0100 (CET)
Received: by cust-asf.ponee.io (Postfix)
id 8BE25160C23; Thu, 11 Jan 2018 17:57:04 +0000 (UTC)
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by cust-asf.ponee.io (Postfix) with SMTP id B34E5160C13
for ; Thu, 11 Jan 2018 18:57:03 +0100 (CET)
Received: (qmail 9002 invoked by uid 500); 11 Jan 2018 17:57:02 -0000
Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: jira@kafka.apache.org
Delivered-To: mailing list jira@kafka.apache.org
Received: (qmail 8991 invoked by uid 99); 11 Jan 2018 17:57:02 -0000
Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142)
by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jan 2018 17:57:02 +0000
Received: from localhost (localhost [127.0.0.1])
by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 798E3C3C2B
for ; Thu, 11 Jan 2018 17:57:02 +0000 (UTC)
X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org
X-Spam-Flag: NO
X-Spam-Score: -99.911
X-Spam-Level:
X-Spam-Status: No, score=-99.911 tagged_above=-999 required=6.31
tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_LOW=-0.7,
SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100]
autolearn=disabled
Received: from mx1-lw-us.apache.org ([10.40.0.8])
by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024)
with ESMTP id kug7c8g9fxZz for ;
Thu, 11 Jan 2018 17:57:01 +0000 (UTC)
Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139])
by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 309D75F39D
for ; Thu, 11 Jan 2018 17:57:01 +0000 (UTC)
Received: from jira-lw-us.apache.org (unknown [207.244.88.139])
by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A9BDAE0373
for ; Thu, 11 Jan 2018 17:57:00 +0000 (UTC)
Received: from jira-lw-us.apache.org (localhost [127.0.0.1])
by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 690B1240C6
for ; Thu, 11 Jan 2018 17:57:00 +0000 (UTC)
Date: Thu, 11 Jan 2018 17:57:00 +0000 (UTC)
From: "ASF GitHub Bot (JIRA)"
To: jira@kafka.apache.org
Message-ID:
In-Reply-To:
References:
Subject: [jira] [Commented] (KAFKA-6265) GlobalKTable missing
#queryableStoreName()
MIME-Version: 1.0
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: 7bit
X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394
[ https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16322667#comment-16322667 ]
ASF GitHub Bot commented on KAFKA-6265:
---------------------------------------
guozhangwang closed pull request #4413: [KAFKA-6265] GlobalKTable missing #queryableStoreName()
URL: https://github.com/apache/kafka/pull/4413
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 43038118692..c68b9bf4333 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -71,6 +71,13 @@
+
+ New method in GlobalKTable
+
+
+ - A method has been provided such that it will return the store name associated with the
GlobalKTable
or null
if the store name is non-queryable.
+
+
New methods in KafkaStreams
:
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 72286c20529..e58f67fc5b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -67,4 +67,10 @@
*/
@InterfaceStability.Evolving
public interface GlobalKTable {
+ /**
+ * Get the name of the local state store that can be used to query this {@code GlobalKTable}.
+ *
+ * @return the underlying state store name, or {@code null} if this {@code GlobalKTable} cannot be queried.
+ */
+ String queryableStoreName();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
index 34e23752444..8fcdfed1e52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
@@ -21,13 +21,29 @@
public class GlobalKTableImpl implements GlobalKTable {
private final KTableValueGetterSupplier valueGetterSupplier;
+ private final boolean queryable;
public GlobalKTableImpl(final KTableValueGetterSupplier valueGetterSupplier) {
this.valueGetterSupplier = valueGetterSupplier;
+ this.queryable = true;
+ }
+
+ public GlobalKTableImpl(final KTableValueGetterSupplier valueGetterSupplier,
+ final boolean queryable) {
+ this.valueGetterSupplier = valueGetterSupplier;
+ this.queryable = queryable;
}
KTableValueGetterSupplier valueGetterSupplier() {
return valueGetterSupplier;
}
+ @Override
+ public String queryableStoreName() {
+ if (!queryable) {
+ return null;
+ }
+ return valueGetterSupplier.storeNames()[0];
+ }
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4308e5d0c50..2a8a89e1f0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -158,7 +158,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil
topic,
processorName,
tableSource);
- return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier(storeBuilder.name()));
+ return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier(storeBuilder.name()), materialized.isQueryable());
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 156acadbedd..b9ba6089497 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -156,6 +156,32 @@ public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() thro
assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
assertNull(table1.queryableStoreName());
}
+
+ @Test
+ public void shouldBuildGlobalTableWithNonQueryableStoreName() throws Exception {
+ final GlobalKTable table1 = builder.globalTable(
+ "topic2",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.>with(null, null),
+ builder,
+ storePrefix));
+
+ assertNull(table1.queryableStoreName());
+ }
+
+ @Test
+ public void shouldBuildGlobalTableWithQueryaIbleStoreName() throws Exception {
+ final GlobalKTable table1 = builder.globalTable(
+ "topic2",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.>as("globalTable"),
+ builder,
+ storePrefix));
+
+ assertEquals("globalTable", table1.queryableStoreName());
+ }
@Test
public void shouldBuildSimpleGlobalTableTopology() throws Exception {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
> GlobalKTable missing #queryableStoreName()
> ------------------------------------------
>
> Key: KAFKA-6265
> URL: https://issues.apache.org/jira/browse/KAFKA-6265
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Antony Stubbs
> Assignee: Richard Yu
> Labels: beginner, needs-kip, newbie
> Fix For: 1.1.0
>
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing from GlobalKTable
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)