Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4410A2004F2 for ; Sat, 26 Aug 2017 19:57:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 42573164E77; Sat, 26 Aug 2017 17:57:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 87FDA164E76 for ; Sat, 26 Aug 2017 19:57:08 +0200 (CEST) Received: (qmail 18268 invoked by uid 500); 26 Aug 2017 17:57:07 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 18257 invoked by uid 99); 26 Aug 2017 17:57:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Aug 2017 17:57:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 2EBD8C68AD for ; Sat, 26 Aug 2017 17:57:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id GzdxuAduyL_m for ; Sat, 26 Aug 2017 17:57:06 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 60FC15FD9C for ; Sat, 26 Aug 2017 17:57:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 9F23BE002C for ; Sat, 26 Aug 2017 17:57:03 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 3DE1A2537E for ; Sat, 26 Aug 2017 17:57:01 +0000 (UTC) Date: Sat, 26 Aug 2017 17:57:00 +0000 (UTC) From: "Richard Yu (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sat, 26 Aug 2017 17:57:09 -0000 [ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142877#comment-16142877 ] Richard Yu edited comment on KAFKA-4468 at 8/26/17 5:56 PM: ------------------------------------------------------------ When looking through the references for Windowed Deserializer, it appears that there was none other than unit tests. The other approach is to look through the Window Store classes and look for the Window size. Inherently, since RocksDBWindowStore is the class that is most commonly used. I looked through that particular class and found that when instantiating RocksDBWindowStore's window. They called the class WindowStoreIteratorWrapper. In lines 54 - 58 of that particular class, this is what I found: {code} @Override public Windowed peekNextKey() { final Bytes next = bytesIterator.peekNextKey(); final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.get()); final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get()); return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)); } {code} In this method, {code} windowSize {code} is known to the class already. Therefore, this is a successful implementation of using a window of limited time alive, since the fixed size is already known. In other words, for us to find the fixed length of time that is needed to be alive, it must be found in the series of calls in which WindowedDeserializer is called. However, when attempting to look for find this sequence of calls between Window Store and WindowedDeserializer, it could not be found. In other words, their might not be a practical way to retrieve the length of the window. was (Author: yohan123): When looking through the references for Windowed Deserializer, it appears that there was none other than unit tests. The other approach is to look through the Window Store classes and look for the Window size. Inherently, since RocksDBWindowStore is the class that is most commonly used. I looked through that particular class and found that when instantiating RocksDBWindowStore's window. They called the class WindowStoreIteratorWrapper. In lines 54 - 58 of that particular class, this is what I found: {code} @Override public Windowed peekNextKey() { final Bytes next = bytesIterator.peekNextKey(); final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.get()); final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get()); return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)); } {code} In this method, {code} windowSize {code} is known to the class already. Therefore, this is a successful implementation of using a window of limited time alive, since the fixed size is already known. In other words, for us to find the fixed length of time that is needed to be alive, it must be found in the series of calls in which WindowedDeserializer is called. However, when attempting to look for find this sequence of calls between Window Store and WindowedDeserializer. Yet, to date, I could not find such a series of calls. In other words, their might not be a practical way to retrieve the length of the window. > Correctly calculate the window end timestamp after read from state stores > ------------------------------------------------------------------------- > > Key: KAFKA-4468 > URL: https://issues.apache.org/jira/browse/KAFKA-4468 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Labels: architecture > > When storing the WindowedStore on the persistent KV store, we only use the start timestamp of the window as part of the combo-key as (start-timestamp, key). The reason that we do not add the end-timestamp as well is that we can always calculate it from the start timestamp + window_length, and hence we can save 8 bytes per key on the persistent KV store. > However, after read it (via {{WindowedDeserializer}}) we do not set its end timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix this by calculating its end timestamp as mentioned above. -- This message was sent by Atlassian JIRA (v6.4.14#64029)