From user-return-33917-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Apr 6 06:30:17 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9D977180647 for ; Mon, 6 Apr 2020 08:30:16 +0200 (CEST) Received: (qmail 14606 invoked by uid 500); 6 Apr 2020 06:30:14 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 14594 invoked by uid 99); 6 Apr 2020 06:30:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Apr 2020 06:30:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 14C5F181394 for ; Mon, 6 Apr 2020 06:30:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.533 X-Spam-Level: *** X-Spam-Status: No, score=3.533 tagged_above=-999 required=6.31 tests=[DKIM_ADSP_CUSTOM_MED=0.001, FORGED_GMAIL_RCVD=1, KAM_DMARC_NONE=0.25, KAM_DMARC_STATUS=0.01, NML_ADSP_CUSTOM_MED=1.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_PASS=-0.001, SPF_SOFTFAIL=0.972, URIBL_BLOCKED=0.001, URI_HEX=0.1] autolearn=disabled Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id jSmI4y2rrjjO for ; Mon, 6 Apr 2020 06:30:09 +0000 (UTC) Received-SPF: Softfail (mailfrom) identity=mailfrom; client-ip=199.38.86.66; helo=n4.nabble.com; envelope-from=salcantaraphd@gmail.com; receiver= Received: from n4.nabble.com (n4.nabble.com [199.38.86.66]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTP id 59A8D7F505 for ; Mon, 6 Apr 2020 06:30:09 +0000 (UTC) Received: from n4.nabble.com (localhost [127.0.0.1]) by n4.nabble.com (Postfix) with ESMTP id 98E2A1257A8E1 for ; Mon, 6 Apr 2020 01:30:00 -0500 (CDT) Date: Mon, 6 Apr 2020 01:30:00 -0500 (CDT) From: =?UTF-8?Q?Salva_Alc=C3=A1ntara?= To: user@flink.apache.org Message-ID: <1586154600501-0.post@n4.nabble.com> Subject: On efficient checkpoints with dynamic (self-evolving) keyed state MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit In a KeyedCoProcessFunction, I am managing a keyed state which consists of third-party library models. These models are created on reception of new data on the control stream within `processElement1`. Because the models are self-evolving, in the sense that have their own internal state, I need to make sure that they are serialized in `modelsBytes` when their state changes. My first attempt goes like this: ```scala class MyOperator extends KeyedCoProcessFunction[String, Control, Data, Prediction] with CheckpointedFunction { // To hold loaded models @transient private var models: HashMap[String, Model] = _ // For serialization purposes @transient private var modelsBytes: MapState[String, Array[Bytes]] = _ override def processElement1(control, ctx, ...) { if (restoreModels) { restoreModels() } // - Create new model out of `control` element // - Add it to `models` keyed state } override def processElement2(data, ctx, ...) { if (restoreModels) { restoreModels() } // - Send `data` element to the corresponding models // This will update their internal states } override def snapshotState(context: FunctionSnapshotContext): Unit = { // Suspicious, wishful-thinking code that compiles and runs just fine for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } override def initializeState(context: FunctionInitializationContext): Unit = { modelsBytes = context.getKeyedStateStore.getMapState[String]( new MapStateDescriptor("modelsBytes", classOf[String]) ) if (context.isRestored) restoreModels = true } } ``` So, the idea is to use `snapshotState` to override the *keyed* state entries in `modelsBytes`. The reason why I am trying this approach is because serializing the models (`model.toBytes`) might be an expensive operation. Therefore, I would prefer to do it once per model when a checkpoint comes. The problem with this approach is that it might be inherently/conceptually wrong. Here is why, even if the code within `snapshotState` compiles and runs just fine, note that I am referring to a keyed state piece without getting a `keyed` context passed in, so it is not clear at all what key I am really working on to start with. I have written a small test to verify the checkpoints, and I have observed that from time to time I get an empty state back, even if the modelsBytes state entries were updated in `snapshotState`. So it seems that snapshotting my models like this is not reliable at all. What confuses me is that the user is perfectly allowed to do this, maybe the `put` method should raise an exception to make it clear that a keyed state is required in the first place, otherwise it gives false hope and might lead to hard-to-spot bugs. As a matter of fact, shouldn't this be considered a bug? The other option I have is, of course, to serialize my models in `processElement2`, after sending new data elements to them. However, continuously serialzing my models to update `modelsBytes` might be costly. What would be the most efficient way to handle this scenario? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/