flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
Date Wed, 23 Nov 2016 10:58:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689751#comment-15689751
] 

ASF GitHub Bot commented on FLINK-4391:
---------------------------------------

Github user bjlovegithub commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2629#discussion_r89293393
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
---
    @@ -0,0 +1,453 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators.async;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    +import org.apache.flink.streaming.api.operators.Output;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer,
    + * and emit results from {@link AsyncCollector} to the next operators following it by
    + * calling {@link Output#collect(Object)}
    + */
    +@Internal
    +public class AsyncCollectorBuffer<IN, OUT> {
    +
    +	/**
    +	 * Max number of {@link AsyncCollector} in the buffer.
    +	 */
    +	private final int bufferSize;
    +
    +	private final AsyncDataStream.OutputMode mode;
    +
    +	private final AsyncWaitOperator<IN, OUT> operator;
    +
    +	/**
    +	 * Keep all {@code AsyncCollector} and their input {@link StreamElement}
    +	 */
    +	private final Map<AsyncCollector<IN, OUT>, StreamElement> queue = new LinkedHashMap<>();
    +	/**
    +	 * For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get
the
    +	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue}
    +	 * is full since main thread waits on this lock. The StreamElement in
    +	 * {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part
of all StreamElements
    +	 * in its queue. It will be kept in the operator state while snapshotting.
    +	 */
    +	private StreamElement extraStreamElement;
    +
    +	/**
    +	 * {@link TimestampedCollector} and {@link Output} to collect results and watermarks.
    +	 */
    +	private final Output<StreamRecord<OUT>> output;
    +	private final TimestampedCollector<OUT> timestampedCollector;
    +
    +	/**
    +	 * Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
    +	 */
    +	private final Object lock;
    +
    +	private final Emitter emitter;
    +	private final Thread emitThread;
    +
    +	private IOException error;
    +
    +	public AsyncCollectorBuffer(
    +			int bufferSize,
    +			AsyncDataStream.OutputMode mode,
    +			Output<StreamRecord<OUT>> output,
    +			TimestampedCollector<OUT> collector,
    +			Object lock,
    +			AsyncWaitOperator operator) {
    +		Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater
than 0.");
    +		Preconditions.checkNotNull(output, "Output should not be NULL.");
    +		Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL.");
    +		Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL.");
    +		Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be
NULL.");
    +
    +		this.bufferSize = bufferSize;
    +		this.mode = mode;
    +		this.output = output;
    +		this.timestampedCollector = collector;
    +		this.operator = operator;
    +		this.lock = lock;
    +
    +		this.emitter = new Emitter();
    +		this.emitThread = new Thread(emitter);
    +	}
    +
    +	/**
    +	 * Add an {@link StreamRecord} into the buffer. A new {@link AsyncCollector} will be
created and returned
    +	 * corresponding to the input StreamRecord.
    +	 * <p>
    +	 * If buffer is full, caller will wait until a new space is available.
    +	 *
    +	 * @param record StreamRecord
    +	 * @return An AsyncCollector
    +	 * @throws Exception InterruptedException or IOException from AsyncCollector.
    +	 */
    +	public AsyncCollector<IN, OUT> addStreamRecord(StreamRecord<IN> record)
throws InterruptedException, IOException {
    +		while (queue.size() >= bufferSize) {
    +			// hold the input StreamRecord until it is placed in the buffer
    +			extraStreamElement = record;
    +
    +			lock.wait();
    +		}
    +
    +		if (error != null) {
    +			throw error;
    +		}
    +
    +		AsyncCollector<IN, OUT> collector = new AsyncCollector(this);
    +
    +		queue.put(collector, record);
    +
    +		extraStreamElement = null;
    +
    +		return collector;
    +	}
    +
    +	/**
    +	 * Add a {@link Watermark} into queue. A new AsyncCollector will be created and returned.
    +	 * <p>
    +	 * If queue is full, caller will wait here.
    +	 *
    +	 * @param watermark Watermark
    +	 * @return AsyncCollector
    +	 * @throws Exception InterruptedException or IOException from AsyncCollector.
    +	 */
    +	public AsyncCollector<IN, OUT> addWatermark(Watermark watermark) throws InterruptedException,
IOException {
    +		return processMark(watermark);
    +	}
    +
    +	/**
    +	 * Add a {@link LatencyMarker} into queue. A new AsyncCollector will be created and
returned.
    +	 * <p>
    +	 * If queue is full, caller will wait here.
    +	 *
    +	 * @param latencyMarker LatencyMarker
    +	 * @return AsyncCollector
    +	 * @throws Exception InterruptedException or IOException from AsyncCollector.
    +	 */
    +	public AsyncCollector<IN, OUT> addLatencyMarker(LatencyMarker latencyMarker) throws
InterruptedException, IOException {
    +		return processMark(latencyMarker);
    +	}
    +
    +	private AsyncCollector<IN, OUT> processMark(StreamElement mark) throws InterruptedException,
IOException {
    +		while (queue.size() >= bufferSize) {
    +			// hold the input StreamRecord until it is placed in the buffer
    +			extraStreamElement = mark;
    +
    +			lock.wait();
    +		}
    +
    +		if (error != null) {
    +			throw error;
    +		}
    +
    +		AsyncCollector<IN, OUT> collector = new AsyncCollector(this, true);
    +
    +		queue.put(collector, mark);
    +
    +		extraStreamElement = null;
    +
    +		return collector;
    +	}
    +
    +	/**
    +	 * Notify the emitter thread and main thread that an AsyncCollector has completed.
    +	 *
    +	 * @param collector Completed AsyncCollector
    +	 */
    +	void markCollectorCompleted(AsyncCollector<IN, OUT> collector) {
    +		synchronized (lock) {
    +			collector.markDone();
    +
    +			// notify main thread to keep working
    +			lock.notifyAll();
    +		}
    +	}
    +
    +	/**
    +	 * Caller will wait here if buffer is not empty, meaning that not all async i/o tasks
have returned yet.
    +	 *
    +	 * @throws Exception InterruptedException or IOException from AsyncCollector.
    +	 */
    +	void waitEmpty() throws InterruptedException, IOException {
    +		while (queue.size() != 0) {
    +			if (error != null) {
    +				throw error;
    +			}
    +
    +			lock.wait();
    +		}
    +	}
    +
    +	public void startEmitterThread() {
    +		this.emitThread.start();
    +	}
    +
    +	public void stopEmitterThread() {
    +		emitter.stop();
    +
    +		emitThread.interrupt();
    +	}
    +
    +	/**
    +	 * Get all StreamElements in the AsyncCollector queue.
    +	 * <p>
    +	 * Emitter Thread can not output records and will wait for a while due to checkpoiting
procedure
    +	 * holding the checkpoint lock.
    +	 *
    +	 * @return A List containing StreamElements.
    +	 */
    +	public List<StreamElement> getStreamElementsInBuffer() {
    +		List<StreamElement> ret = new ArrayList<>(queue.size());
    +		for (Map.Entry<AsyncCollector<IN, OUT>, StreamElement> entry : queue.entrySet())
{
    +			ret.add(entry.getValue());
    +		}
    +
    +		// add the lonely input outside of queue into return set.
    +		if (extraStreamElement != null) {
    +			ret.add(extraStreamElement);
    +		}
    +
    +		return ret;
    +	}
    +
    +	@VisibleForTesting
    +	public Map<AsyncCollector<IN, OUT>, StreamElement> getQueue() {
    +		return this.queue;
    +	}
    +
    +	/**
    +	 * A working thread to output results from {@link AsyncCollector} to the next operator.
    +	 */
    +	private class Emitter implements Runnable {
    +		private volatile boolean running = true;
    +
    +		private void output(AsyncCollector collector, StreamElement element) throws Exception
{
    --- End diff --
    
    You mean the `StreamElement` in the parameter list?


> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
>                 Key: FLINK-4391
>                 URL: https://issues.apache.org/jira/browse/FLINK-4391
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Jamie Grier
>            Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a DataStream.
 The classic example would be joining against an external database in order to enrich a stream
with extra information.
> It would be nice to add general support for this type of operation in the Flink API.
 Ideally this could simply take the form of a new operator that manages async operations,
keeps so many of them in flight, and then emits results to downstream operators as the async
operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message