flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] AHeise commented on a change in pull request #12575: [FLINK-18094][checkpointing] Unifies the creation of BarrierHandlers and CheckpointedInputGate.
Date Thu, 11 Jun 2020 19:50:44 GMT

AHeise commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r439030513



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -91,31 +91,25 @@
 	private final ThreadSafeUnaligner threadSafeUnaligner;
 
 	CheckpointBarrierUnaligner(
-			int[] numberOfInputChannelsPerGate,
 			SubtaskCheckpointCoordinator checkpointCoordinator,
 			String taskName,
-			AbstractInvokable toNotifyOnCheckpoint) {
+			AbstractInvokable toNotifyOnCheckpoint,
+			IndexedInputGate... inputGates) {
 		super(toNotifyOnCheckpoint);
 
 		this.taskName = taskName;
-
-		final int numGates = numberOfInputChannelsPerGate.length;
-
-		gateChannelOffsets = new int[numGates];
-		for (int index = 1; index < numGates; index++) {
-			gateChannelOffsets[index] = gateChannelOffsets[index - 1] + numberOfInputChannelsPerGate[index
- 1];
-		}
-
-		final int totalNumChannels = gateChannelOffsets[numGates - 1] + numberOfInputChannelsPerGate[numGates
- 1];
-		hasInflightBuffers = new boolean[totalNumChannels];
-
-		channelInfos = IntStream.range(0, numGates)
-			.mapToObj(gateIndex -> IntStream.range(0, numberOfInputChannelsPerGate[gateIndex])
-				.mapToObj(channelIndex -> new InputChannelInfo(gateIndex, channelIndex)))
-			.flatMap(Function.identity())
+		this.channelInfos = Arrays.stream(inputGates)
+			.flatMap(gate -> gate.getChannels().stream().map(InputChannel::getChannelInfo))
 			.toArray(InputChannelInfo[]::new);
-
-		threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels,	checkNotNull(checkpointCoordinator),
this);
+		hasInflightBuffers = new boolean[channelInfos.length];
+		threadSafeUnaligner = new ThreadSafeUnaligner(channelInfos.length, checkNotNull(checkpointCoordinator),
this);
+
+		gateChannelOffsets = new int[inputGates.length];
+		int offset = 0;
+		for (final IndexedInputGate gate: inputGates) {
+			gateChannelOffsets[gate.getGateIndex()] = offset;
+			offset += gate.getNumberOfInputChannels();
+		}

Review comment:
       That's a tough one; I had feared you pointing it out. The issue is that they are all
similar but in the end also quite different.
   `CheckpointBarrierAligner` creates a map gate->offset, because it also creates an index->gate
array, which it uses in `resumeConsumption` to first resolve the gate for a given flattened
channel index and then delegate `resumeConsumption` with adjusted index (that's where the
offset is needed).
   `CheckpointBarrierUnaligner` mainly uses the `InputChannelInfo` and hence creates a gate
index->offset map to create the flattened index for other `BarrierHandler` stuff.
   These two versions gets the `SingleInputGates` (only they have indices).
   
   `InputProcessorUtil.createCheckpointedInputGates` uses union input gates, so it cannot
use indexes. So it goes by list index. However, now that I write it down this looks also suspicious...
   I guess to use simple offsets in `CheckpointedInput` (because of union gates), we actually
need to assume consecutive indices... @pnowojski however also saw some strange cases, maybe
he can add his observations.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message