flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From makeyang <...@git.apache.org>
Subject [GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service
Date Thu, 10 May 2018 10:43:53 GMT
Github user makeyang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r187297365
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
    @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId,
long times
     	 *
     	 * @param context context that provides information and means required for taking a
snapshot
     	 */
    -	public void snapshotState(StateSnapshotContext context) throws Exception {
    +	public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress)
throws Exception {
     		if (getKeyedStateBackend() != null) {
     			KeyedStateCheckpointOutputStream out;
    -
    +			OperatorStateCheckpointOutputStream metaOut;
     			try {
     				out = context.getRawKeyedOperatorStateOutput();
     			} catch (Exception exception) {
     				throw new Exception("Could not open raw keyed operator state stream for " +
     					getOperatorName() + '.', exception);
     			}
    -
     			try {
    -				KeyGroupsList allKeyGroups = out.getKeyGroupList();
    -				for (int keyGroupIdx : allKeyGroups) {
    -					out.startNewKeyGroup(keyGroupIdx);
    -
    -					timeServiceManager.snapshotStateForKeyGroup(
    -						new DataOutputViewStreamWrapper(out), keyGroupIdx);
    -				}
    +				metaOut = context.getRawKeyedOperatorStateMetaOutput();
     			} catch (Exception exception) {
    -				throw new Exception("Could not write timer service of " + getOperatorName() +
    -					" to checkpoint state stream.", exception);
    -			} finally {
    -				try {
    -					out.close();
    -				} catch (Exception closeException) {
    -					LOG.warn("Could not close raw keyed operator state stream for {}. This " +
    -						"might have prevented deleting some state data.", getOperatorName(), closeException);
    -				}
    +				throw new Exception("Could not open raw operator state stream for " +
    +					getOperatorName() + '.', exception);
     			}
    +			final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>>
ret = timeServiceManager.startOneSnapshotState();
    +			final int currentSnapshotVersion = ret.f0;
    +			final Map<String, HeapInternalTimerService> timerServices = ret.f1;
    +			final Integer stateTableVersion = ret.f2;
    +			final TreeSet<Integer> snapshotVersions = ret.f3;
    +			LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString());
    +			Callable<Boolean> snapshotTimerCallable = new Callable() {
    +				@Override
    +				public Boolean call() {
    +					try {
    +						KeyGroupsList allKeyGroups = out.getKeyGroupList();
    +						metaOut.startNewPartition();
    +						DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut);
    +						metaWrapper.writeInt(stateTableVersion);
    +						if (snapshotVersions.size() > 0) {
    +							metaWrapper.writeInt(snapshotVersions.size());
    +							for (Integer i : snapshotVersions) {
    +								metaWrapper.writeInt(i);
    +							}
    +						}
    +						else {
    +							metaWrapper.writeInt(0);
    +						}
    +						int keyGroupCount = allKeyGroups.getNumberOfKeyGroups();
    +						metaWrapper.writeInt(keyGroupCount);
    +						for (int keyGroupIdx : allKeyGroups) {
    +							out.startNewKeyGroup(keyGroupIdx);
    +							metaWrapper.writeInt(keyGroupIdx);
    +							InternalTimerServiceSerializationProxy serializationProxy =
    +								new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
    +									currentSnapshotVersion, timeServiceManager, metaWrapper);
    +
    +							serializationProxy.write(new DataOutputViewStreamWrapper(out));
    +
    +						}
    +						LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString());
    +						return true;
    +					} catch (Exception exception) {
    +						LOG.error("Could not write timer service of " + getOperatorName() +
    +							" to checkpoint state stream.", exception);
    +						return false;
    +					} finally {
    +						timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
    +						StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl)
context;
    +						try {
    +							snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    +						} catch (IOException e) {
    +							LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
    +							return false;
    --- End diff --
    
    how about change return type from Boolean to Tuple which contains throwable when exception
happens?


---

Mime
View raw message