spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brkyvz <...@git.apache.org>
Subject [GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
Date Tue, 17 Oct 2017 23:31:29 GMT
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145282877
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
       /**
        * Set the timeout duration in ms for this key.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'durationMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming
query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(durationMs: Long): Unit
     
    +
       /**
        * Set the timeout duration for this key as a string. For example, "1 hour", "2 days",
etc.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming
query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(duration: String): Unit
     
    -  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming
query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'timestampMs' is not positive or less than the current watermark in a streaming
query")
    +  @throws[UnsupportedOperationException](
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming
query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'additionalDuration' is invalid or the final timeout timestamp is less than "
+
    +      "the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming
query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming
query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *      `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
    +
    +
    +  /**
    +   * Get the current event time watermark as milliseconds in epoch time.
    +   *
    +   * @note In a streaming query, this can be called only when watermark is set before
calling
    +   *       `[map/flatmap]GroupsWithState`. In a batch query, this method always returns
-1.
    +   */
    +  @throws[UnsupportedOperationException](
    +    "if watermark has not been set before in [map|flatMap]GroupsWithState")
    +  def getCurrentWatermarkMs(): Long
    +
    +
    +  /**
    +   * Get the current event time watermark.
    --- End diff --
    
    `Get the current processing time`? This is equivalent to having `current_timestamp()`
as a column right? Will it also be a constant value across the trigger, or will it be static
for the duration of the trigger?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message