Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 133DF200D18 for ; Wed, 27 Sep 2017 04:22:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 11D4A1609EA; Wed, 27 Sep 2017 02:22:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 301BC1609D7 for ; Wed, 27 Sep 2017 04:22:05 +0200 (CEST) Received: (qmail 75670 invoked by uid 500); 27 Sep 2017 02:22:04 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 75659 invoked by uid 99); 27 Sep 2017 02:22:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Sep 2017 02:22:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D20D11A2A9F for ; Wed, 27 Sep 2017 02:22:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id c-E3Yc3dPUoc for ; Wed, 27 Sep 2017 02:22:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 913A160CF8 for ; Wed, 27 Sep 2017 02:22:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id BF817E0E4F for ; Wed, 27 Sep 2017 02:22:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 60CF02427A for ; Wed, 27 Sep 2017 02:22:00 +0000 (UTC) Date: Wed, 27 Sep 2017 02:22:00 +0000 (UTC) From: "Richard Yu (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (KAFKA-4499) Add "getAllKeys" API for querying windowed KTable stores MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 27 Sep 2017 02:22:06 -0000 [ https://issues.apache.org/jira/browse/KAFKA-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16181875#comment-16181875 ] Richard Yu edited comment on KAFKA-4499 at 9/27/17 2:21 AM: ------------------------------------------------------------ I noticed a couple of things when looking through the {{ReadOnlyWindowStore}} interface. The {{all(long timeFrom, long timeTo)}} could be easily implemented because the {{fetch}} method provided does almost exactly what [~mjsax] mentioned above, except focusing solely on a range of given keys: {code} /** * Get all the key-value pairs in the given key range and time range from all * the existing windows. * * @param from the first key in the range * @param to the last key in the range * @param timeFrom time range start (inclusive) * @param timeTo time range end (inclusive) * @return an iterator over windowed key-value pairs {@code , value>} * @throws InvalidStateStoreException if the store is not initialized * @throws NullPointerException If null is used for any key. */ KeyValueIterator, V> fetch(K from, K to, long timeFrom, long timeTo); {code} Notice the words in the description: "Gets all the key-value pairs within the given key range and the time range from... ". Thus, the {{all(long timeFrom, long timeTo)}} could just call the above {{fetch}} method with setting the {{from}} and {{to}} with there respective minimum and maximums. (e.g. {{return fetch(key_min, key_max, timeFrom, timeTo))}}) And the {{allLatest()}} method would only have to check and disard the nonlatest windows available for each key. The problem here however is that the finding the minimum and maximum key ranges (which in effect must be solved by the {{keys()}} method). I have found a way around this problem, although this might only apply for {{CachingWindowStore}}. One of the fields is the {{ThreadCache}} instance. Inside, there is the following method: {code} public MemoryLRUCacheBytesIterator all(final String namespace) { final NamedCache cache = getCache(namespace); if (cache == null) { return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); } return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache); } {code} This {{all(...)}} method could easily be called in the implementation and then return all the keys. For convenience, there should also be the method(s) {{minKey()}} and {{maxKey()}} which could return the minimum and maximum keys (maybe calling {{all()}} to check all keys). Is this the correct approach towards adding this new feature? was (Author: yohan123): I noticed a couple of things when looking through the {{ReadOnlyWindowStore}} interface. The {{all(long timeFrom, long timeTo)}} could be easily implemented because the {{fetch}} method provided does almost exactly what [~mjsax] mentioned above, except focusing solely on a range of given keys: {code} /** * Get all the key-value pairs in the given key range and time range from all * the existing windows. * * @param from the first key in the range * @param to the last key in the range * @param timeFrom time range start (inclusive) * @param timeTo time range end (inclusive) * @return an iterator over windowed key-value pairs {@code , value>} * @throws InvalidStateStoreException if the store is not initialized * @throws NullPointerException If null is used for any key. */ KeyValueIterator, V> fetch(K from, K to, long timeFrom, long timeTo); {code} Notice the words in the description: "Gets all the key-value pairs within the given key range and the time range from... ". Thus, the {{all(long timeFrom, long timeTo)}} could just call the above {{fetch}} method with setting the {{from}} and {{to}} with there respective minimum and maximums. (e.g. {{return fetch(key_min, key_max, from, to))}}) And the {{allLatest()}} method would only have to check and disard the nonlatest windows available for each key. The problem here however is that the finding the minimum and maximum key ranges (which in effect must be solved by the {{keys()}} method). I have found a way around this problem, although this might only apply for {{CachingWindowStore}}. One of the fields is the {{ThreadCache}} instance. Inside, there is the following method: {code} public MemoryLRUCacheBytesIterator all(final String namespace) { final NamedCache cache = getCache(namespace); if (cache == null) { return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); } return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache); } {code} This {{all(...)}} method could easily be called in the implementation and then return all the keys. For convenience, there should also be the method(s) {{minKey()}} and {{maxKey()}} which could return the minimum and maximum keys (maybe calling {{all()}} to check all keys). Is this the correct approach towards adding this new feature? > Add "getAllKeys" API for querying windowed KTable stores > -------------------------------------------------------- > > Key: KAFKA-4499 > URL: https://issues.apache.org/jira/browse/KAFKA-4499 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Labels: needs-kip > > Currently, both {{KTable}} and windowed-{{KTable}} stores can be queried via IQ feature. While {{ReadOnlyKeyValueStore}} (for {{KTable}} stores) provide method {{all()}} to scan the whole store (ie, returns an iterator over all stored key-value pairs), there is no similar API for {{ReadOnlyWindowStore}} (for windowed-{{KTable}} stores). > This limits the usage of a windowed store, because the user needs to know what keys are stored in order the query it. It would be useful to provide possible APIs like this (only a rough sketch): > - {{keys()}} returns all keys available in the store (maybe together with available time ranges) > - {{all(long timeFrom, long timeTo)}} that returns all window for a specific time range > - {{allLatest()}} that returns the latest window for each key > Because this feature would require to scan multiple segments (ie, RockDB instances) it would be quite inefficient with current store design. Thus, this feature also required to redesign the underlying window store itself. > Because this is a major change, a KIP (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) is required. The KIP should cover the actual API design as well as the store refactoring. -- This message was sent by Atlassian JIRA (v6.4.14#64029)