flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Date Fri, 16 Feb 2018 10:11:42 GMT
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168717057
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
---
    @@ -223,155 +223,110 @@ public StreamOperatorStateContext streamOperatorStateContext(
     
     	protected OperatorStateBackend operatorStateBackend(
     		String operatorIdentifierText,
    -		OperatorSubtaskState operatorSubtaskStateFromJobManager,
    +		PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,
     		CloseableRegistry backendCloseableRegistry) throws Exception {
     
    -		//TODO search in local state for a local recovery opportunity.
    +		BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> backendRestorer
=
    +			new BackendRestorerProcedure<>(
    +				() -> stateBackend.createOperatorStateBackend(environment, operatorIdentifierText),
    +				backendCloseableRegistry);
     
    -		return createOperatorStateBackendFromJobManagerState(
    -			operatorIdentifierText,
    -			operatorSubtaskStateFromJobManager,
    -			backendCloseableRegistry);
    +		return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
     	}
     
     	protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
     		TypeSerializer<K> keySerializer,
     		String operatorIdentifierText,
    -		OperatorSubtaskState operatorSubtaskStateFromJobManager,
    +		PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,
     		CloseableRegistry backendCloseableRegistry) throws Exception {
     
     		if (keySerializer == null) {
     			return null;
     		}
     
    -		//TODO search in local state for a local recovery opportunity.
    +		TaskInfo taskInfo = environment.getTaskInfo();
    +
    +		final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
    +			taskInfo.getMaxNumberOfParallelSubtasks(),
    +			taskInfo.getNumberOfParallelSubtasks(),
    +			taskInfo.getIndexOfThisSubtask());
     
    -		return createKeyedStatedBackendFromJobManagerState(
    -			keySerializer,
    -			operatorIdentifierText,
    -			operatorSubtaskStateFromJobManager,
    -			backendCloseableRegistry);
    +		BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle>
backendRestorer =
    +			new BackendRestorerProcedure<>(
    +				() -> stateBackend.createKeyedStateBackend(
    +					environment,
    +					environment.getJobID(),
    +					operatorIdentifierText,
    +					keySerializer,
    +					taskInfo.getMaxNumberOfParallelSubtasks(),
    +					keyGroupRange,
    +					environment.getTaskKvStateRegistry()),
    +				backendCloseableRegistry);
    +
    +		return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
     	}
     
     	protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs(
    -		OperatorSubtaskState operatorSubtaskStateFromJobManager) {
    +		Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives)
{
     
    -		if (operatorSubtaskStateFromJobManager != null) {
    +		if (restoreStateAlternatives.hasNext()) {
     
     			final CloseableRegistry closeableRegistry = new CloseableRegistry();
     
    -			Collection<OperatorStateHandle> rawOperatorState =
    -				operatorSubtaskStateFromJobManager.getRawOperatorState();
    -
    -			return new CloseableIterable<StatePartitionStreamProvider>() {
    -				@Override
    -				public void close() throws IOException {
    -					closeableRegistry.close();
    -				}
    -
    -				@Nonnull
    -				@Override
    -				public Iterator<StatePartitionStreamProvider> iterator() {
    -					return new OperatorStateStreamIterator(
    -						DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
    -						rawOperatorState.iterator(), closeableRegistry);
    -				}
    -			};
    -		}
    -
    -		return CloseableIterable.empty();
    -	}
    -
    -	protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs(
    -		OperatorSubtaskState operatorSubtaskStateFromJobManager) {
    +			Collection<OperatorStateHandle> rawOperatorState = restoreStateAlternatives.next();
    +			// TODO currently this does not support local state recovery, so we expect there is
only one handle.
    +			Preconditions.checkState(!restoreStateAlternatives.hasNext());
    --- End diff --
    
    👍 


---

Mime
View raw message