Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 05382200D00 for ; Sun, 10 Sep 2017 14:40:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EE8F41609B7; Sun, 10 Sep 2017 12:40:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 189091609B4 for ; Sun, 10 Sep 2017 14:40:06 +0200 (CEST) Received: (qmail 79221 invoked by uid 500); 10 Sep 2017 12:40:05 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 79208 invoked by uid 99); 10 Sep 2017 12:40:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Sep 2017 12:40:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 13381183A63 for ; Sun, 10 Sep 2017 12:40:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.48 X-Spam-Level: ** X-Spam-Status: No, score=2.48 tagged_above=-999 required=6.31 tests=[HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id YyLehXVz2REX for ; Sun, 10 Sep 2017 12:40:03 +0000 (UTC) Received: from mail-lf0-f51.google.com (mail-lf0-f51.google.com [209.85.215.51]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 0937B5F3CD for ; Sun, 10 Sep 2017 12:40:03 +0000 (UTC) Received: by mail-lf0-f51.google.com with SMTP id 80so13117560lfy.4 for ; Sun, 10 Sep 2017 05:40:02 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=MPmiNYd6LQPLkVErkobSG1K6tA8RmuVvdj22yScplBE=; b=kdMG1QBW4Hug7tqToLjNj5y8yg5+DWqo5+M5iykwabuMUxrzV+zHGrWB9X9r8WGi8R G+p/RFY1fh82sHiEzfRDZPlOul8xmRjmS0Yyb2iYlkEFmdhqKvuRfpbzEGebsJVjIhSu XAnizXzRM0q5Wj5T8zfZ1qb6NvWZtIUCtg6gpVglmc3Om0pedcKwVW2+QlOT5vE/jnaS rmpWSAwi7/rWv1Vdf1dIQKnYDEQHLoYD9Qd7rq6Fq3ULUtHc0MSl405cqp7iGT1MXVtE AZlsnHY3y2jggUMZPnZ88ksFIeYVzvBL58VAkqv1mfNfAPzB/sK32Q3mMpfMlJx7CRkZ u85Q== X-Gm-Message-State: AHPjjUjIkFTVDNkRsbcfnBQJXqYzIzLkqGTfJSSTHwZWDLbbhh9e3cyK kSBAVG+8NTldNIKpFbwJ+w== X-Received: by 10.25.209.202 with SMTP id i193mr2339734lfg.97.1505047195876; Sun, 10 Sep 2017 05:39:55 -0700 (PDT) Received: from mail-lf0-f49.google.com (mail-lf0-f49.google.com. [209.85.215.49]) by smtp.gmail.com with ESMTPSA id v9sm1265960lja.61.2017.09.10.05.39.55 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Sun, 10 Sep 2017 05:39:55 -0700 (PDT) Received: by mail-lf0-f49.google.com with SMTP id m199so13110177lfe.3 for ; Sun, 10 Sep 2017 05:39:55 -0700 (PDT) X-Google-Smtp-Source: ADKCNb77UdoB9iGPBWT8PN775PRWgLFmAy2MUIDZuAKWq9hrd9PNhehIaGZuCsjLlTiXv4KvgUN5CSRnwKryvAkL6/4= X-Received: by 10.46.91.154 with SMTP id m26mr167955lje.111.1505047195050; Sun, 10 Sep 2017 05:39:55 -0700 (PDT) MIME-Version: 1.0 Received: by 10.46.0.221 with HTTP; Sun, 10 Sep 2017 05:39:54 -0700 (PDT) From: "Federico D'Ambrosio" Date: Sun, 10 Sep 2017 14:39:54 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Is State access synchronized? To: user Content-Type: multipart/alternative; boundary="94eb2c06d4846e67790558d51bf0" archived-at: Sun, 10 Sep 2017 12:40:08 -0000 --94eb2c06d4846e67790558d51bf0 Content-Type: text/plain; charset="UTF-8" Hi, as per the mail subject I wanted to ask you if a State access (read and write) is synchronized. I have the following stream: val airtrafficEvents = stream .keyBy(_.flightInfo.flight) .map(new UpdateIdFunction()) where UpdateIdFunction is a RichMapFunction with a ValueState and a MapState, with the following map method def map(value: AirTrafficEvent): AirTrafficEventWithId = { val flight = value.flightInfo.flight val time = value.instantValues.time AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis)) } private def createOrGetId(_key: String, _time: Long): Int = { val tmpId = valuestate.value //Remove from MapState entries older than one minute val entry = Option[(Int, Long)](lookupMap.get(_key)) //update ValueState or MapState if needed //return current updated ValueState or corresponding ID from updated MapState } So, I'm using the MapState to track the integer IDs of the events of the stream, retaining only the latest records inside the MapState, and I'm using the ValueState to generate an incremental integer ID for said events. Given all of this, I'm really not sure how the mapping is applied to the keyedstream in input: is it guaranteed that each time the method is called I'm getting the latest and updated value/map? Thank you for your attention, Federico --94eb2c06d4846e67790558d51bf0 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi,

as per the mail subject I = wanted to ask you if a State access (read and write) is synchronized.
I have the following stream:

val airtrafficEvents =3D stream
.keyBy(_.
flightInfo.flight)
.map(new UpdateIdFunction())
=


where UpdateIdFunction is a RichMapFunction with a ValueStat= e and a MapState, with the following map method

def map(value: AirTrafficEvent): AirTrafficEven= tWithId =3D {

=C2=A0 val flight =3D value.flightInfo.flight
=C2= =A0 val time =3D value.instantValues.time

=C2=A0 AirTrafficEventWith= Id(value, createOrGetId(flight, time.getMillis))

}

private de= f createOrGetId(_key: String, _time: Long): Int =3D {

=C2=A0 val tmp= Id =3D valuestate.value

=C2=A0 //Remove from MapState entries older than one minut= e

=C2=A0= val entry =3D Option[(Int, Long)](lookupMap.get(_key))

=C2=A0 //update ValueState or = MapState if needed

=C2=A0 //return current updated ValueState or corresponding ID from= updated MapState

}

So, I'm using the MapState= to track the integer IDs of the events of the stream, retaining only the latest records inside the M= apState, and I'm using the ValueState to generate an incremental intege= r ID for said events.
Given all of this, I'm really not sure h= ow the mapping is applied to the keyedstream in input: is it guaranteed tha= t each time the method is called I'm getting the latest and updated val= ue/map?

<= div>Thank you for yo= ur attention,
Federico<= br>
--94eb2c06d4846e67790558d51bf0--