flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
Date Fri, 24 Mar 2017 18:28:42 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940878#comment-15940878
] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r107969496
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
---
    @@ -17,100 +17,164 @@
     
     package org.apache.flink.streaming.runtime.tasks;
     
    -import java.util.concurrent.ArrayBlockingQueue;
    -import java.util.concurrent.BlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -
     import org.apache.flink.annotation.Internal;
     import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    +import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.Output;
     import org.apache.flink.streaming.api.watermark.Watermark;
    -import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
     import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
    +import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
     import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.types.Either;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * TODO write javadoc
    + * <p>
    + * - open a list state per snapshot process
    + * - book-keep snapshot logs
    + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT
belong in other snapshots
    + *
    + * @param <IN>
    + */
     @Internal
    -public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT>
{
    +public class StreamIterationHead<IN> extends OneInputStreamTask<IN, IN> {
     
     	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
     
     	private volatile boolean running = true;
     
    -	// ------------------------------------------------------------------------
    -	
    +	private volatile RecordWriterOutput<IN>[] outputs;
    +
    +	private UpstreamLogger<IN> upstreamLogger;
    +
    +	private Object lock;
    +
    +	@Override
    +	public void init() throws Exception {
    +		this.lock = getCheckpointLock();
    +		getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration()));
    +		operatorChain = new OperatorChain<>(this);
    +		this.upstreamLogger = (UpstreamLogger<IN>) operatorChain.getHeadOperator();
    +	}
    +
     	@Override
     	protected void run() throws Exception {
    -		
    +
     		final String iterationId = getConfiguration().getIterationId();
     		if (iterationId == null || iterationId.length() == 0) {
     			throw new Exception("Missing iteration ID in the task configuration");
     		}
    -		
    -		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId
,
    -				getEnvironment().getTaskInfo().getIndexOfThisSubtask());
    -		
    +		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId,
    +			getEnvironment().getTaskInfo().getIndexOfThisSubtask());
     		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
     		final boolean shouldWait = iterationWaitTime > 0;
     
    -		final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);
    +		final BlockingQueue<Either<StreamRecord<IN>, CheckpointBarrier>>
dataChannel
    +			= new ArrayBlockingQueue<>(1);
     
     		// offer the queue for the tail
     		BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
     		LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID);
     
     		// do the work 
     		try {
    -			@SuppressWarnings("unchecked")
    -			RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
    +			outputs = (RecordWriterOutput<IN>[]) getStreamOutputs();
     
     			// If timestamps are enabled we make sure to remove cyclic watermark dependencies
     			if (isSerializingTimestamps()) {
    -				for (RecordWriterOutput<OUT> output : outputs) {
    +				for (RecordWriterOutput<IN> output : outputs) {
     					output.emitWatermark(new Watermark(Long.MAX_VALUE));
     				}
     			}
     
    +			synchronized (lock) {
    +				//emit in-flight events in the upstream log upon recovery
    +				for (StreamRecord<IN> rec : upstreamLogger.getReplayLog()) {
    --- End diff --
    
    Maybe it would make sense to put this in a while loop and check "running" as well, to
cancel early if the job is cancelled during replay


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution graph. An alternative
scheme can potentially include records in-transit through the back-edges of a cyclic execution
graph (ABS [1]) to achieve the same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as follows along
the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start block output
and start upstream backup of all records forwarded from the respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch barrier
to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource should finalize
the snapshot, unblock its output and emit all records in-transit in FIFO order and continue
the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected snapshot first
and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but this can
be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message