flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
Date Fri, 24 Mar 2017 18:28:42 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940875#comment-15940875
] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r107968567
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
---
    @@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
     	 * Creates the identification string with which head and tail task find the shared blocking
     	 * queue for the back channel. The identification string is unique per parallel head/tail
pair
     	 * per iteration per job.
    -	 * 
    -	 * @param jid The job ID.
    -	 * @param iterationID The id of the iteration in the job.
    +	 *
    +	 * @param jid          The job ID.
    +	 * @param iterationID  The id of the iteration in the job.
     	 * @param subtaskIndex The parallel subtask number
     	 * @return The identification string.
     	 */
     	public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex)
{
     		return jid + "-" + iterationID + "-" + subtaskIndex;
     	}
    +
    +	/**
    +	 * An internal operator that solely serves as a state logging facility for persisting,
    +	 * partitioning and restoring output logs for dataflow cycles consistently. To support
concurrency,
    +	 * logs are being sliced proportionally to the number of concurrent snapshots. This
allows committed
    +	 * output logs to be uniquely identified and cleared after each complete checkpoint.
    +	 * <p>
    +	 * The design is based on the following assumptions:
    +	 * <p>
    +	 * - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered
within an execution.
    +	 * - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices
in respective FIFO order.
    +	 * - Upon restoration the logger sorts sliced logs in the same FIFO order and returns
an Iterable that
    +	 * gives a singular view of the log.
    +	 * <p>
    +	 * TODO it seems that ListState.clear does not unregister state. We need to put a hook
for that.
    +	 *
    +	 * @param <IN>
    +	 */
    +	public static class UpstreamLogger<IN> extends AbstractStreamOperator<IN>
implements OneInputStreamOperator<IN, IN> {
    +
    +		private final StreamConfig config;
    +
    +		private LinkedList<ListState<StreamRecord<IN>>> slicedLog = new LinkedList<>();
    +
    +		private UpstreamLogger(StreamConfig config) {
    +			this.config = config;
    +		}
    +
    +		public void logRecord(StreamRecord<IN> record) throws Exception {
    +			if (!slicedLog.isEmpty()) {
    +				slicedLog.getLast().add(record);
    +			}
    +		}
    +
    +		public void createSlice(String sliceID) throws Exception {
    +			ListState<StreamRecord<IN>> nextSlice =
    +				getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID,
    +					config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader())));
    +			slicedLog.addLast(nextSlice);
    +		}
    +
    +		public void discardSlice() {
    +			ListState<StreamRecord<IN>> logToEvict = slicedLog.pollFirst();
    +			logToEvict.clear();
    +		}
    +
    +		public Iterable<StreamRecord<IN>> getReplayLog() throws Exception {
    +			final List<String> logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
    +			Collections.sort(logSlices, new Comparator<String>() {
    +				@Override
    +				public int compare(String o1, String o2) {
    +					return Long.valueOf(o1).compareTo(Long.valueOf(o2));
    +				}
    +			});
    +
    +			final List<Iterator<StreamRecord<IN>>> wrappedIterators = new ArrayList<>();
    +			for (String splitID : logSlices) {
    +				wrappedIterators.add(getOperatorStateBackend()
    +					.getOperatorState(new ListStateDescriptor<>(splitID,
    +						config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).get().iterator());
    +			}
    +
    +			if (wrappedIterators.size() == 0) {
    +				return new Iterable<StreamRecord<IN>>() {
    +					@Override
    +					public Iterator<StreamRecord<IN>> iterator() {
    +						return Collections.emptyListIterator();
    +					}
    +				};
    +			}
    +
    +			return new Iterable<StreamRecord<IN>>() {
    +				@Override
    +				public Iterator<StreamRecord<IN>> iterator() {
    +
    +					return new Iterator<StreamRecord<IN>>() {
    +						int indx = 0;
    +						Iterator<StreamRecord<IN>> currentIterator = wrappedIterators.get(0);
    +
    +						@Override
    +						public boolean hasNext() {
    +							if (!currentIterator.hasNext()) {
    +								progressLog();
    +							}
    +							return currentIterator.hasNext();
    +						}
    +
    +						@Override
    +						public StreamRecord<IN> next() {
    +							if (!currentIterator.hasNext() && indx < wrappedIterators.size()) {
    +								progressLog();
    +							}
    +							return currentIterator.next();
    +						}
    +
    +						private void progressLog() {
    +							while (!currentIterator.hasNext() && ++indx < wrappedIterators.size())
{
    +								currentIterator = wrappedIterators.get(indx);
    +							}
    +						}
    +
    +						@Override
    +						public void remove() {
    +							throw new UnsupportedOperationException();
    +						}
    +
    +					};
    +				}
    +			};
    +		}
    +
    +		public void clearLog() throws Exception {
    +			for (String outputLogs : getOperatorStateBackend().getRegisteredStateNames()) {
    --- End diff --
    
    It's kind of bad that we can't remove the state completely and keep iterating over them
when replaying the log...


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution graph. An alternative
scheme can potentially include records in-transit through the back-edges of a cyclic execution
graph (ABS [1]) to achieve the same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as follows along
the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start block output
and start upstream backup of all records forwarded from the respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch barrier
to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource should finalize
the snapshot, unblock its output and emit all records in-transit in FIFO order and continue
the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected snapshot first
and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but this can
be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message