flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?
Date Mon, 02 Dec 2019 17:02:25 GMT
Hi Salva

As I pointed out, it's not clear for your program logic if you call 'state.clear()' within
'snapshotState' as you do not know what exact current key is. Hence, I think your idea like
that does not make any sense.

From my point of view, 'clear' works sometimes in your code is not a bug at current Flink
framework. Currently, we would set currentKey when processing a record. However, Flink does
not need to reset current key to null since there is no such life cycle for setting current
key now. There seems no any benefit if introducing this, and might cause performance regression
as we need more steps here.

Yun Tang

On 12/2/19, 9:29 PM, "Salva Alcántara" <salcantaraphd@gmail.com> wrote:

    Hi Yun,
    Thanks for your reply. You mention that
    " ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to
    snapshot and initialize for operator state"
    but..."mainly" is not "exclusively" right? So, I guess my question tries to
    figure out whether doing something like this is valid/makes sense?
      override def snapshotState(context: FunctionSnapshotContext): Unit = {
        if (models.nonEmpty) {
          for ((k, model) <- models) {
            modelsBytes.put(k, model.toBytes(v))
    Indeed, the above code seems to work well ... so it seems like a bug that
    `clear` works sometimes but sometimes not as I noted in my reply to Congxian
    and others have noted in this extended question posted in stackoverflow:
    Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

View raw message