flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyfora <...@git.apache.org>
Subject [GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...
Date Fri, 24 Mar 2017 18:28:06 GMT
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r107969496
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
---
    @@ -17,100 +17,164 @@
     
     package org.apache.flink.streaming.runtime.tasks;
     
    -import java.util.concurrent.ArrayBlockingQueue;
    -import java.util.concurrent.BlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -
     import org.apache.flink.annotation.Internal;
     import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    +import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.Output;
     import org.apache.flink.streaming.api.watermark.Watermark;
    -import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
     import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
    +import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
     import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.types.Either;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * TODO write javadoc
    + * <p>
    + * - open a list state per snapshot process
    + * - book-keep snapshot logs
    + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT
belong in other snapshots
    + *
    + * @param <IN>
    + */
     @Internal
    -public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT>
{
    +public class StreamIterationHead<IN> extends OneInputStreamTask<IN, IN> {
     
     	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
     
     	private volatile boolean running = true;
     
    -	// ------------------------------------------------------------------------
    -	
    +	private volatile RecordWriterOutput<IN>[] outputs;
    +
    +	private UpstreamLogger<IN> upstreamLogger;
    +
    +	private Object lock;
    +
    +	@Override
    +	public void init() throws Exception {
    +		this.lock = getCheckpointLock();
    +		getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration()));
    +		operatorChain = new OperatorChain<>(this);
    +		this.upstreamLogger = (UpstreamLogger<IN>) operatorChain.getHeadOperator();
    +	}
    +
     	@Override
     	protected void run() throws Exception {
    -		
    +
     		final String iterationId = getConfiguration().getIterationId();
     		if (iterationId == null || iterationId.length() == 0) {
     			throw new Exception("Missing iteration ID in the task configuration");
     		}
    -		
    -		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId
,
    -				getEnvironment().getTaskInfo().getIndexOfThisSubtask());
    -		
    +		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId,
    +			getEnvironment().getTaskInfo().getIndexOfThisSubtask());
     		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
     		final boolean shouldWait = iterationWaitTime > 0;
     
    -		final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);
    +		final BlockingQueue<Either<StreamRecord<IN>, CheckpointBarrier>>
dataChannel
    +			= new ArrayBlockingQueue<>(1);
     
     		// offer the queue for the tail
     		BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
     		LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID);
     
     		// do the work 
     		try {
    -			@SuppressWarnings("unchecked")
    -			RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
    +			outputs = (RecordWriterOutput<IN>[]) getStreamOutputs();
     
     			// If timestamps are enabled we make sure to remove cyclic watermark dependencies
     			if (isSerializingTimestamps()) {
    -				for (RecordWriterOutput<OUT> output : outputs) {
    +				for (RecordWriterOutput<IN> output : outputs) {
     					output.emitWatermark(new Watermark(Long.MAX_VALUE));
     				}
     			}
     
    +			synchronized (lock) {
    +				//emit in-flight events in the upstream log upon recovery
    +				for (StreamRecord<IN> rec : upstreamLogger.getReplayLog()) {
    --- End diff --
    
    Maybe it would make sense to put this in a while loop and check "running" as well, to
cancel early if the job is cancelled during replay


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message