flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
Date Fri, 28 Jul 2017 12:57:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104908#comment-16104908
] 

ASF GitHub Bot commented on FLINK-6215:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3669#discussion_r130081876
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
---
    @@ -61,52 +65,89 @@
     	 * @param end End of the range of numbers to emit.
     	 */
     	public StatefulSequenceSource(long start, long end) {
    +		Preconditions.checkArgument(start <= end);
     		this.start = start;
     		this.end = end;
     	}
     
     	@Override
     	public void initializeState(FunctionInitializationContext context) throws Exception
{
     
    -		Preconditions.checkState(this.checkpointedState == null,
    +		Preconditions.checkState(checkpointedState == null,
     			"The " + getClass().getSimpleName() + " has already been initialized.");
     
     		this.checkpointedState = context.getOperatorStateStore().getOperatorState(
     			new ListStateDescriptor<>(
    -				"stateful-sequence-source-state",
    -				LongSerializer.INSTANCE
    +				"stateful-sequence-source-state", 
    +					new TupleSerializer<>(
    +							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
    +							new TypeSerializer<?>[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE
}
    +					)
     			)
     		);
     
    -		this.valuesToEmit = new ArrayDeque<>();
    +		this.endToNextOffsetMapping = new HashMap<>();
     		if (context.isRestored()) {
    -			// upon restoring
    -
    -			for (Long v : this.checkpointedState.get()) {
    -				this.valuesToEmit.add(v);
    +			for (Tuple2<Long, Long> partitionInfo: checkpointedState.get()) {
    +				Long prev = endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
    +				Preconditions.checkState(prev == null,
    +						getClass().getSimpleName() + " : Duplicate entry when restoring.");
     			}
     		} else {
    -			// the first time the job is executed
    -
    -			final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
     			final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
    -			final long congruence = start + taskIdx;
    +			final int parallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
    +
    +			final long totalElements = Math.abs(end - start + 1L);
    +			final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
    +			final int totalPartitions = totalElements < Integer.MAX_VALUE ? Math.min(maxParallelism,
(int) totalElements) : maxParallelism;
     
    -			long totalNoOfElements = Math.abs(end - start + 1);
    -			final int baseSize = safeDivide(totalNoOfElements, stepSize);
    -			final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 :
baseSize;
    +			Tuple2<Integer, Integer> localPartitionRange = getLocalRange(totalPartitions,
parallelTasks, taskIdx);
    +			int localStartIdx = localPartitionRange.f0;
    +			int localEndIdx = localStartIdx + localPartitionRange.f1;
     
    -			for (long collected = 0; collected < toCollect; collected++) {
    -				this.valuesToEmit.add(collected * stepSize + congruence);
    +			for (int partIdx = localStartIdx; partIdx < localEndIdx; partIdx++) {
    +				Tuple2<Long, Long> limits = getPartitionLimits(totalElements, totalPartitions,
partIdx);
    +				endToNextOffsetMapping.put(limits.f1, limits.f0);
     			}
     		}
     	}
     
    +	private Tuple2<Integer, Integer> getLocalRange(int totalPartitions, int parallelTasks,
int taskIdx) {
    +		int minPartitionSliceSize = totalPartitions / parallelTasks;
    +		int remainingPartitions = totalPartitions - minPartitionSliceSize * parallelTasks;
    +
    +		int localRangeStartIdx = taskIdx * minPartitionSliceSize + Math.min(taskIdx, remainingPartitions);
    +		int localRangeSize = taskIdx < remainingPartitions ? minPartitionSliceSize + 1 :
minPartitionSliceSize;
    +
    +		return new Tuple2<>(localRangeStartIdx, localRangeSize);
    +	}
    +
    +	private Tuple2<Long, Long> getPartitionLimits(long totalElements, int totalPartitions,
long partitionIdx) {
    +		long minElementPartitionSize = totalElements / totalPartitions;
    +		long remainingElements = totalElements - minElementPartitionSize * totalPartitions;
    +		long startOffset = start;
    +
    +		for (int idx = 0; idx < partitionIdx; idx++) {
    +			long partitionSize = idx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
    +			startOffset += partitionSize;
    +		}
    +
    +		long partitionSize = partitionIdx < remainingElements ? minElementPartitionSize
+ 1L : minElementPartitionSize;
    +		return new Tuple2<>(startOffset, startOffset + partitionSize);
    +	}
    +
     	@Override
     	public void run(SourceContext<Long> ctx) throws Exception {
    -		while (isRunning && !this.valuesToEmit.isEmpty()) {
    -			synchronized (ctx.getCheckpointLock()) {
    -				ctx.collect(this.valuesToEmit.poll());
    +		for (Map.Entry<Long, Long> partition: endToNextOffsetMapping.entrySet()) {
    --- End diff --
    
    I wonder if it would make sense to emit local ranges by order of increasing "end offsets".
That way at least the emitted values are still always increasing.
    While we can't really guarantee ordering with this new rescalable implementation, we could
still do a best effort on that locally. What do you think?


> Make the StatefulSequenceSource scalable.
> -----------------------------------------
>
>                 Key: FLINK-6215
>                 URL: https://issues.apache.org/jira/browse/FLINK-6215
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.4.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to emit first
and keeps them in memory. This is not scalable as for large sequences of elements this can
lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the {{maxParallelism}}
parameter, and just keep state (to checkpoint) per such partition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message