Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 77BFE200BA5 for ; Wed, 19 Oct 2016 11:28:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 76411160AEA; Wed, 19 Oct 2016 09:28:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BCDC7160ADE for ; Wed, 19 Oct 2016 11:28:44 +0200 (CEST) Received: (qmail 71183 invoked by uid 500); 19 Oct 2016 09:28:43 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 71168 invoked by uid 99); 19 Oct 2016 09:28:43 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 09:28:43 +0000 Received: from mail-it0-f50.google.com (mail-it0-f50.google.com [209.85.214.50]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 920E31A0143 for ; Wed, 19 Oct 2016 09:28:43 +0000 (UTC) Received: by mail-it0-f50.google.com with SMTP id 66so22657597itl.1 for ; Wed, 19 Oct 2016 02:28:42 -0700 (PDT) X-Gm-Message-State: AA6/9RkrWlCu7eCbYFafwVT8XGy5mHpl6EKGPDXpchNmDYpCA6nciO5pIELabuF5Hdo44bI8KONBm+YKATekZQ== X-Received: by 10.36.154.66 with SMTP id l63mr1935758ite.5.1476869322312; Wed, 19 Oct 2016 02:28:42 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Aljoscha Krettek Date: Wed, 19 Oct 2016 09:28:31 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Add MapState for keyed streams To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c1142d6562cf5053f346fa5 archived-at: Wed, 19 Oct 2016 09:28:45 -0000 --94eb2c1142d6562cf5053f346fa5 Content-Type: text/plain; charset=UTF-8 Hi, just making sure I understand this correctly. Would the MapState keys be the same keys as the one provided when creating the KeyedStream or a different key. As an example, would it be like this: DataStream> input = ...; KeyedStream keyed = input.keyBy(0) keyed.map( Tuple2 input -> mapState.put(input.f0, input.f1) ) or like this: DataStream> input = ...; KeyedStream keyed = input.keyBy(0) keyed.map( Tuple3 input -> mapState.put(input.f1, input.f2) ) // <- notice how the key for the MapState is different from the key of the KeyedStream Cheers, Aljoscha On Wed, 19 Oct 2016 at 10:55 Till Rohrmann wrote: Hi Xiaogang, I really like your proposal and think that this would be a valuable addition to Flink :-) For convenience we could maybe add contains(K key), too. Java's Map interface returns a Set of Entry when calling entrySet() (which is the equivalent of iterator() in our interface). The Entry interface not only allows to get access to the key and value of the map entry but also allows to set a value for the respective key via setValue (even though it's an optional operation). Maybe we want to offer something similar when getting access to the entry set via the iterator method. Cheers, Till On Wed, Oct 19, 2016 at 4:18 AM, SHI Xiaogang wrote: > Hi, all. I created the JIRA https://issues.apache.org/ > jira/browse/FLINK-4856 to > propose adding MapStates into Flink. > > MapStates are very useful in our daily jobs. For example, when implementing > DistinctCount, we store the values into a MapState and the result of each > group(key) is exactly the number of entries in the MapState. > > In my opinion, the methods provided by the MapState may include: > * void put(K key, V value) > * V get(K key) > * Iterable keys() > * Iterable values() > * Iterator> iterator() > > Do you have any comments? Any is appreciated. > > Xiaogang > --94eb2c1142d6562cf5053f346fa5--