spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request #17179: [SPARK-19067][SS] Processing-time-based timeout i...
Date Tue, 14 Mar 2017 18:21:50 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17179#discussion_r105989232
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala ---
    @@ -61,25 +65,50 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
      *  - After that, if `update(newState)` is called, then `exists()` will again return
`true`,
      *    `get()` and `getOption()`will return the updated value.
      *
    + * Important points to note about using `KeyedStateTimeout`.
    + *  - The timeout type is a global param across all the keys (set as `timeout` param
in
    + *    `[map|flatMap]GroupsWithState`, but the exact timeout duration is configurable
per key
    + *    (by calling `setTimeout...()` in `KeyedState`).
    + *  - When the timeout occurs for a key, the function is called with no values, and
    + *    `KeyedState.isTimingOut()` set to true.
    + *  - The timeout is reset for key every time the function is called on the key, that
is,
    + *    when the key has new data, or the key has timed out. So the user has to set the
timeout
    + *    duration every time the function is called, otherwise there will not be any timeout
set.
    + *  - Guarantees provided on processing-time-based timeout of key, when timeout duration
is D ms:
    + *    - Timeout will never be called before real clock time has advanced by D ms
    + *    - Timeout will be called eventually when there is a trigger in the query
    + *      (i.e. after D ms). So there is a no strict upper bound on when the timeout would
occur.
    + *      For example, the trigger interval of the query will affect when the timeout is
actually hit.
    + *      If there is no data in the stream (for any key) for a while, then their will
not be
    + *      any trigger and timeout will not be hit until there is data.
    + *
      * Scala example of using KeyedState in `mapGroupsWithState`:
      * {{{
      * // A mapping function that maintains an integer state for string keys and returns
a string.
      * def mappingFunction(key: String, value: Iterator[Int], state: KeyedState[Int]): String
= {
    - *   // Check if state exists
    - *   if (state.exists) {
    - *     val existingState = state.get  // Get the existing state
    - *     val shouldRemove = ...         // Decide whether to remove the state
    + *
    + *   if (state.isTimingOut) {                // If called when timing out, remove the
state
    + *     state.remove()
    + *
    + *   } else if (state.exists) {              // If state exists, use it for processing
    + *     val existingState = state.get         // Get the existing state
    + *     val shouldRemove = ...                // Decide whether to remove the state
      *     if (shouldRemove) {
    - *       state.remove()     // Remove the state
    + *       state.remove()                      // Remove the state
    + *
      *     } else {
      *       val newState = ...
    - *       state.update(newState)    // Set the new state
    + *       state.update(newState)              // Set the new state
      *     }
    + *
      *   } else {
      *     val initialState = ...
    - *     state.update(initialState)  // Set the initial state
    + *     state.update(initialState)            // Set the initial state
      *   }
    - *   ... // return something
    + *   state.setTimeoutDuration("1 hour")      // Set the timeout
    --- End diff --
    
    It does not. Once state is remove, timeouts are disabled. I will add this to the docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message