flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sihua Zhou (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
Date Sat, 28 Apr 2018 02:32:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sihua Zhou reassigned FLINK-9269:
---------------------------------

    Assignee: Sihua Zhou

> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-9269
>                 URL: https://issues.apache.org/jira/browse/FLINK-9269
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> {code:java}
> 					@Nonnull
> 					@Override
> 					protected SnapshotResult<KeyedStateHandle> performOperation() throws Exception
{
> 						long startTime = System.currentTimeMillis();
> 						CheckpointStreamFactory.CheckpointStateOutputStream localStream =
> 							this.streamAndResultExtractor.getCheckpointOutputStream();
> 						DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream);
> 						serializationProxy.write(outView);
> 						long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
> 						for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups();
++keyGroupPos) {
> 							int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
> 							keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
> 							outView.writeInt(keyGroupId);
> 							for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet())
{
> 								try (OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream))
{
> 									DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
> 									kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
> 									cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView,
keyGroupId);
> 								} // this will just close the outer compression stream
> 							}
> 						}
> 						if (cancelStreamRegistry.unregisterCloseable(streamAndResultExtractor)) {
> 							KeyGroupRangeOffsets kgOffs = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
> 							SnapshotResult<StreamStateHandle> result =
> 								streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
> 							streamAndResultExtractor = null;
> 							logOperationCompleted(primaryStreamFactory, startTime);
> 							return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result,
kgOffs);
> 						}
> 						return SnapshotResult.empty();
> 					}
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message