From jira-return-8744-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Jan 11 18:57:04 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id A436618076D for ; Thu, 11 Jan 2018 18:57:04 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8BE25160C23; Thu, 11 Jan 2018 17:57:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B34E5160C13 for ; Thu, 11 Jan 2018 18:57:03 +0100 (CET) Received: (qmail 9002 invoked by uid 500); 11 Jan 2018 17:57:02 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 8991 invoked by uid 99); 11 Jan 2018 17:57:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jan 2018 17:57:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 798E3C3C2B for ; Thu, 11 Jan 2018 17:57:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.911 X-Spam-Level: X-Spam-Status: No, score=-99.911 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_LOW=-0.7, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id kug7c8g9fxZz for ; Thu, 11 Jan 2018 17:57:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 309D75F39D for ; Thu, 11 Jan 2018 17:57:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A9BDAE0373 for ; Thu, 11 Jan 2018 17:57:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 690B1240C6 for ; Thu, 11 Jan 2018 17:57:00 +0000 (UTC) Date: Thu, 11 Jan 2018 17:57:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName() MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16322667#comment-16322667 ] ASF GitHub Bot commented on KAFKA-6265: --------------------------------------- guozhangwang closed pull request #4413: [KAFKA-6265] GlobalKTable missing #queryableStoreName() URL: https://github.com/apache/kafka/pull/4413 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 43038118692..c68b9bf4333 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -71,6 +71,13 @@

Streams API to distinguish them from configurations of other clients that share the same config names.

+

+ New method in GlobalKTable +

+
    +
  • A method has been provided such that it will return the store name associated with the GlobalKTable or null if the store name is non-queryable.
  • +
+

New methods in KafkaStreams:

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java index 72286c20529..e58f67fc5b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java @@ -67,4 +67,10 @@ */ @InterfaceStability.Evolving public interface GlobalKTable { + /** + * Get the name of the local state store that can be used to query this {@code GlobalKTable}. + * + * @return the underlying state store name, or {@code null} if this {@code GlobalKTable} cannot be queried. + */ + String queryableStoreName(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java index 34e23752444..8fcdfed1e52 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java @@ -21,13 +21,29 @@ public class GlobalKTableImpl implements GlobalKTable { private final KTableValueGetterSupplier valueGetterSupplier; + private final boolean queryable; public GlobalKTableImpl(final KTableValueGetterSupplier valueGetterSupplier) { this.valueGetterSupplier = valueGetterSupplier; + this.queryable = true; + } + + public GlobalKTableImpl(final KTableValueGetterSupplier valueGetterSupplier, + final boolean queryable) { + this.valueGetterSupplier = valueGetterSupplier; + this.queryable = queryable; } KTableValueGetterSupplier valueGetterSupplier() { return valueGetterSupplier; } + @Override + public String queryableStoreName() { + if (!queryable) { + return null; + } + return valueGetterSupplier.storeNames()[0]; + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 4308e5d0c50..2a8a89e1f0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -158,7 +158,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil topic, processorName, tableSource); - return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier(storeBuilder.name())); + return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier(storeBuilder.name()), materialized.isQueryable()); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index 156acadbedd..b9ba6089497 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -156,6 +156,32 @@ public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() thro assertEquals("topic2", topology.storeToChangelogTopic().get(storeName)); assertNull(table1.queryableStoreName()); } + + @Test + public void shouldBuildGlobalTableWithNonQueryableStoreName() throws Exception { + final GlobalKTable table1 = builder.globalTable( + "topic2", + consumed, + new MaterializedInternal<>( + Materialized.>with(null, null), + builder, + storePrefix)); + + assertNull(table1.queryableStoreName()); + } + + @Test + public void shouldBuildGlobalTableWithQueryaIbleStoreName() throws Exception { + final GlobalKTable table1 = builder.globalTable( + "topic2", + consumed, + new MaterializedInternal<>( + Materialized.>as("globalTable"), + builder, + storePrefix)); + + assertEquals("globalTable", table1.queryableStoreName()); + } @Test public void shouldBuildSimpleGlobalTableTopology() throws Exception { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > GlobalKTable missing #queryableStoreName() > ------------------------------------------ > > Key: KAFKA-6265 > URL: https://issues.apache.org/jira/browse/KAFKA-6265 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Antony Stubbs > Assignee: Richard Yu > Labels: beginner, needs-kip, newbie > Fix For: 1.1.0 > > > KTable has the nicely useful #queryableStoreName(), it seems to be missing from GlobalKTable -- This message was sent by Atlassian JIRA (v6.4.14#64029)