flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
Date Mon, 06 Mar 2017 15:00:38 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897446#comment-15897446
] 

ASF GitHub Bot commented on FLINK-5929:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3479#discussion_r104432699
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
---
    @@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long time) {
     	}
     
     	/**
    +	 * For now keyed state is not allowed in ProcessWindowFunctions
    +	 */
    +	public class MergingKeyStore implements KeyedStateStore {
    +
    +		protected W window;
    +
    +		@Override
    +		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties)
{
    +			throw new RuntimeException("keyedState is not allowed in merging windows");
    +		}
    +
    +		@Override
    +		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties)
{
    +			throw new RuntimeException("keyedState is not allowed in merging windows");
    +		}
    +
    +		@Override
    +		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T>
stateProperties) {
    +			throw new RuntimeException("keyedState is not allowed in merging windows");
    +		}
    +
    +		@Override
    +		public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T,
A> stateProperties) {
    +			throw new RuntimeException("keyedState is not allowed in merging windows");
    +		}
    +
    +		@Override
    +		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK,
UV> stateProperties) {
    +			throw new RuntimeException("keyedState is not allowed in merging windows");
    +		}
    +	}
    +
    +	public class WindowPaneKeyStore implements KeyedStateStore {
    +
    +		protected W window;
    +
    +		@Override
    +		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties)
{
    +			try {
    +				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
    +			} catch (Exception e) {
    +				throw new RuntimeException("Could not retrieve state", e);
    +			}
    +		}
    +
    +		@Override
    +		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties)
{
    +			try {
    +				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
    +			} catch (Exception e) {
    +				throw new RuntimeException("Could not retrieve state", e);
    +			}
    +		}
    +
    +		@Override
    +		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T>
stateProperties) {
    +			try {
    +				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
    +			} catch (Exception e) {
    +				throw new RuntimeException("Could not retrieve state", e);
    +			}
    +		}
    +
    +		@Override
    +		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T,
ACC> stateProperties) {
    +			try {
    +				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
    +			} catch (Exception e) {
    +				throw new RuntimeException("Could not retrieve state", e);
    +			}
    +		}
    +
    +		@Override
    +		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK,
UV> stateProperties) {
    +			try {
    +				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
    +			} catch (Exception e) {
    +				throw new RuntimeException("Could not retrieve state", e);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * {@code WindowContext} is a utility for handling {@code ProcessWindowFunction} invocations.
It can be reused
    +	 * by setting the {@code key} and {@code window} fields. No internal state must be kept
in
    +	 * the {@code WindowContext}
    +	 */
    +	public class WindowContext implements InternalWindowFunction.InternalWindowContext {
    +		protected W window;
    +
    +		protected WindowPaneKeyStore windowPaneKeyStore;
    +		protected MergingKeyStore mergingKeyStore;
    +
    +		public WindowContext(W window) {
    +			this.window = window;
    +			this.windowPaneKeyStore = new WindowPaneKeyStore();
    +			this.mergingKeyStore = new MergingKeyStore();
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return "WindowContext{Window = " + window.toString() + "}";
    +		}
    +
    +		public void clear() throws Exception {
    +			userFunction.clear(window, this);
    +		}
    +
    +		@Override
    +		public KeyedStateStore windowState() {
    +			if (windowAssigner instanceof MergingWindowAssigner) {
    --- End diff --
    
    Instead of checking every time you could initialise the `WindowContext` with either a
`WindowStateStore` or the (exception throwing) `MergingStateStore` at the beginning.


> Allow Access to Per-Window State in ProcessWindowFunction
> ---------------------------------------------------------
>
>                 Key: FLINK-5929
>                 URL: https://issues.apache.org/jira/browse/FLINK-5929
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} can access
is scoped to the key of the window but not the window itself. That is, state is global across
all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For example, if
you expect to have several {{Trigger}} firings (due to early and late firings) a user can
keep state per window to keep some information between those firings.
> The per-window state has to be cleaned up in some way. For this I see two options:
>  - Keep track of all state that a user uses and clean up when we reach the window GC
horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called when we reach
the window GC horizon that users can/should use to clean up their state.
> On the API side, we can add a method {{windowState()}} on {{ProcessWindowFunction.Context}}
that retrieves the per-window state and {{globalState()}} that would allow access to the (already
available) global state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
>     /**
>      * @return The window that is being evaluated.
>      */
>     public abstract W window();
>     /**
>      * State accessor for per-key and per-window state.
>      */
>     KeyedStateStore windowState();
>     /**
>      * State accessor for per-key global state.
>      */
>     KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message