Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ECC67166AF9 for ; Tue, 25 Jul 2017 14:46:14 +0200 (CEST) Received: (qmail 78610 invoked by uid 500); 25 Jul 2017 12:46:13 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 78600 invoked by uid 99); 25 Jul 2017 12:46:13 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jul 2017 12:46:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 44DB7C030E for ; Tue, 25 Jul 2017 12:46:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id TskXg6HwzICV for ; Tue, 25 Jul 2017 12:46:11 +0000 (UTC) Received: from mail-yw0-f179.google.com (mail-yw0-f179.google.com [209.85.161.179]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 4F4D25F3CC for ; Tue, 25 Jul 2017 12:46:11 +0000 (UTC) Received: by mail-yw0-f179.google.com with SMTP id l82so10125105ywc.2 for ; Tue, 25 Jul 2017 05:46:11 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=ms0ESTX2WIzW+26c+jAAOIlBYR2Bk9pxl8eJA4Zyb2o=; b=vPI8E/38147OqvO25CdPwSXrNP/6LhSI0ba99s/ktUeswujnIGUrVAoD8WyWJlnjJQ iRbkc6iwJIqi9PovpYhzfa2StUruZ3rmVf0VfgIj2DDIbU/mCfHvU9zEZHjG04u8qLXX odo5oXB8rBeV+Ag6W4GJXO9gHeu4116qiKcRqKlP4L8+7DZ2uq4mJ9OldwNPGW3CHtHh 5736AtY7nvFVSL/JHlOXZofsib0YlNCrENAO7k93tKZDH8e4mZ86z3mDRGQ9qmEVkGJ8 t0Tclsbblf7V9bGcVUuWjniugE6NzyEbrSN91XahblzKoHyg240qFD/tk2RNK/DzVbdp gd1Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=ms0ESTX2WIzW+26c+jAAOIlBYR2Bk9pxl8eJA4Zyb2o=; b=MUXG8iESn2D1xJ3PNmspZ6sXXUlvcfbdyFb7fNNizFYzaqHKT0Wv5+5xgDN4CIeN9k Ka43vyVO2DQQyTrXFwIwPodhv7eT5Bzl9UVbl4C6KQF/ryHHuGy1h0vFt6d33SOd1OF2 ZFrrmpZPDGCRaPJ+XVKblgyM8Er9G2KxdqDIRWS7tLArvcez1+NyyBKLJiW5oO/kfstP 1x1DkeN3nCtQfa3J61YHGvnTtwqsQI2TF7dJCa4g5nu+ud661vMq0SQsAhc3sJXtu0/N b5ofRJVmPlnfw79Zbs+CXCNcTzoE8WbWw9/6CT1iQKWYUtt8ZyPofm6lSDpuCk0/GWxA kasQ== X-Gm-Message-State: AIVw113fX3ddvrdGKmEZQ8/L2UHwTx0tzj1v6yM5u9d5wpZcgkphgmeU ygF9MC57ZURsAriAiX8DrGGOeogpGbLnyYQ= X-Received: by 10.129.99.135 with SMTP id x129mr17689535ywb.6.1500986769793; Tue, 25 Jul 2017 05:46:09 -0700 (PDT) MIME-Version: 1.0 Received: by 10.129.211.5 with HTTP; Tue, 25 Jul 2017 05:45:49 -0700 (PDT) From: Victor Godoy Poluceno Date: Tue, 25 Jul 2017 09:45:49 -0300 Message-ID: Subject: Unable to make mapWithState work correctly To: user@flink.apache.org Content-Type: multipart/alternative; boundary="001a1147396039e7d9055523b7fb" --001a1147396039e7d9055523b7fb Content-Type: text/plain; charset="UTF-8" Hi, I am trying to write a simple streaming program to count values from a Kafka topic in a fault tolerant manner, like this : val config: Configuration = new Configuration() config.setString(ConfigConstants.STATE_BACKEND, "filesystem") config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink") val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(10) val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); val stream = env .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties)) .map((_, 1)) .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => { val newCount = in._2 + count.getOrElse(0) ((in._1, newCount), Some(newCount)) }).print env.execute("Job") The idea is to use the filesystem state backend to persist the computation state (count) and to restore the computation state in case of failure or restart. I have a program that inject the same key on Kafka. But I am unable to make Flink work correctly, every time the Flink restarts the value from state is empty, so the count starts from zero. What am I missing here? I am running this on a local environment (sbt run) with Flink 1.3.1, Java 1.8.0_131, and Ubuntu 16.04. -- hooray! -- Victor Godoy Poluceno --001a1147396039e7d9055523b7fb Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi,

I am trying to write a sim= ple streaming program to count values from a Kafka topic in a fault toleran= t manner, like this:

<code>
val config: = Configuration =3D new Configuration()
config.setString(ConfigConstants.S= TATE_BACKEND, "filesystem")
config.setString("state.backe= nd.fs.checkpointdir", "file:///tmp/flink")

val env = =3D StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
e= nv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)= ;
env.enableCheckpointing(10)

val properties =3D new Properties()= ;
properties.setProperty("bootstrap.servers", "localhost:= 9092");
properties.setProperty("gr= oup.id", "test");

val stream =3D env
=C2=A0=C2= =A0=C2=A0 .addSource(new FlinkKafkaConsumer010[String]("test", ne= w SimpleStringSchema(), properties))
=C2=A0=C2=A0=C2=A0 .map((_, 1))
= =C2=A0=C2=A0=C2=A0 .keyBy(_._1)
=C2=A0=C2=A0=C2=A0 .mapWithState((in: (S= tring, Int), count: Option[Int]) =3D> {
=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0 val newCount =3D in._2 + count.getOrElse(0)
=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 ((in._1, newCount), Some(newCount))
=C2=A0=C2=A0=C2=A0 }).print
env.execute("Job")
</code>

= The idea is to use the filesystem state backend to persist the computation = state (count) and to restore the computation state in case of failure or re= start. I have a program that inject the same key on Kafka. But I am unable = to make Flink work correctly, every time the Flink restarts the value from = state is empty, so the count starts from zero. What am I missing here?
<= br>
I am running this on a local environment (sbt run) with Flink= 1.3.1, Java 1.8.0_131, and Ubuntu 16.04.
=

--
hooray!

--
Vi= ctor Godoy Poluceno
--001a1147396039e7d9055523b7fb--