Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 15B4A200BD3 for ; Mon, 21 Nov 2016 12:32:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 152C3160AEC; Mon, 21 Nov 2016 11:32:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 15F9D160B1E for ; Mon, 21 Nov 2016 12:32:12 +0100 (CET) Received: (qmail 9122 invoked by uid 500); 21 Nov 2016 11:32:12 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 9107 invoked by uid 99); 21 Nov 2016 11:32:12 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Nov 2016 11:32:12 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 30B6E2C4C70 for ; Mon, 21 Nov 2016 11:32:12 +0000 (UTC) Date: Mon, 21 Nov 2016 11:32:12 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 21 Nov 2016 11:32:14 -0000 [ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683297#comment-15683297 ] ASF GitHub Bot commented on FLINK-4391: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88712331 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** + * Max number of {@link AsyncCollector} in the buffer. + */ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** + * Keep all {@code AsyncCollector} and their input {@link StreamElement} + */ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** + * For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the + * {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} + * is full since main thread waits on this lock. The StreamElement in + * {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements + * in its queue. It will be kept in the operator state while snapshotting. + */ + private StreamElement extraStreamElement; + + /** + * {@link TimestampedCollector} and {@link Output} to collect results and watermarks. + */ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** + * Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} + */ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock = lock; + + this.emitter = new Emitter(); + this.emitThread = new Thread(emitter); + } + + /** + * Add an {@link StreamRecord} into the buffer. A new {@link AsyncCollector} will be created and returned + * corresponding to the input StreamRecord. + *

+ * If buffer is full, caller will wait until a new space is available. + * + * @param record StreamRecord + * @return An AsyncCollector + * @throws Exception InterruptedException or IOException from AsyncCollector. + */ + public AsyncCollector addStreamRecord(StreamRecord record) throws InterruptedException, IOException { + while (queue.size() >= bufferSize) { + // hold the input StreamRecord until it is placed in the buffer + extraStreamElement = record; + + lock.wait(); + } + + if (error != null) { + throw error; + } + + AsyncCollector collector = new AsyncCollector(this); + + queue.put(collector, record); + + extraStreamElement = null; + + return collector; + } + + /** + * Add a {@link Watermark} into queue. A new AsyncCollector will be created and returned. + *

+ * If queue is full, caller will wait here. + * + * @param watermark Watermark + * @return AsyncCollector + * @throws Exception InterruptedException or IOException from AsyncCollector. + */ + public AsyncCollector addWatermark(Watermark watermark) throws InterruptedException, IOException { + return processMark(watermark); + } + + /** + * Add a {@link LatencyMarker} into queue. A new AsyncCollector will be created and returned. + *

+ * If queue is full, caller will wait here. + * + * @param latencyMarker LatencyMarker + * @return AsyncCollector + * @throws Exception InterruptedException or IOException from AsyncCollector. + */ + public AsyncCollector addLatencyMarker(LatencyMarker latencyMarker) throws InterruptedException, IOException { + return processMark(latencyMarker); + } + + private AsyncCollector processMark(StreamElement mark) throws InterruptedException, IOException { + while (queue.size() >= bufferSize) { + // hold the input StreamRecord until it is placed in the buffer + extraStreamElement = mark; + + lock.wait(); + } + + if (error != null) { + throw error; + } + + AsyncCollector collector = new AsyncCollector(this, true); + + queue.put(collector, mark); + + extraStreamElement = null; + + return collector; + } + + /** + * Notify the emitter thread and main thread that an AsyncCollector has completed. + * + * @param collector Completed AsyncCollector + */ + void markCollectorCompleted(AsyncCollector collector) { + synchronized (lock) { + collector.markDone(); + + // notify main thread to keep working + lock.notifyAll(); + } + } + + /** + * Caller will wait here if buffer is not empty, meaning that not all async i/o tasks have returned yet. + * + * @throws Exception InterruptedException or IOException from AsyncCollector. + */ + void waitEmpty() throws InterruptedException, IOException { + while (queue.size() != 0) { + if (error != null) { + throw error; + } + + lock.wait(); + } + } + + public void startEmitterThread() { + this.emitThread.start(); + } + + public void stopEmitterThread() { + emitter.stop(); + + emitThread.interrupt(); + } + + /** + * Get all StreamElements in the AsyncCollector queue. + *

+ * Emitter Thread can not output records and will wait for a while due to checkpoiting procedure + * holding the checkpoint lock. + * + * @return A List containing StreamElements. + */ + public List getStreamElementsInBuffer() { + List ret = new ArrayList<>(queue.size()); + for (Map.Entry, StreamElement> entry : queue.entrySet()) { + ret.add(entry.getValue()); + } + + // add the lonely input outside of queue into return set. + if (extraStreamElement != null) { + ret.add(extraStreamElement); + } + + return ret; + } + + @VisibleForTesting + public Map, StreamElement> getQueue() { + return this.queue; + } + + /** + * A working thread to output results from {@link AsyncCollector} to the next operator. + */ + private class Emitter implements Runnable { + private volatile boolean running = true; + + private void output(AsyncCollector collector, StreamElement element) throws Exception { + List result = collector.getResult(); + + // update timestamp for output stream records based on the input stream record. + if (element == null) { + throw new Exception("No input stream element for current AsyncCollector"); + } + + if (element.isRecord()) { + if (result == null) { + throw new Exception("Result for stream record "+element+" is null"); + } + + timestampedCollector.setTimestamp(element.asRecord()); + for (OUT val : result) { + timestampedCollector.collect(val); + } + } + else if (element.isWatermark()) { + output.emitWatermark(element.asWatermark()); + } + else if (element.isLatencyMarker()) { + operator.sendLatencyMarker(element.asLatencyMarker()); + } + else { + throw new IOException("Unknown input record: "+element); + } + } + + /** + * Emit results from the finished head collector and its following finished ones. + */ + private void orderedProcess() throws IOException { + Iterator, StreamElement>> iterator = queue.entrySet().iterator(); + + while (iterator.hasNext()) { + try { + Map.Entry, StreamElement> entry = iterator.next(); + + AsyncCollector collector = entry.getKey(); + + if (!collector.isDone()) { + break; + } + + output(collector, entry.getValue()); + + iterator.remove(); + + lock.notifyAll(); + } + catch (Exception e) { + throw new IOException(e); --- End diff -- Error message would be quite helpful. > Provide support for asynchronous operations over streams > -------------------------------------------------------- > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Jamie Grier > Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a DataStream. The classic example would be joining against an external database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the Flink API. Ideally this could simply take the form of a new operator that manages async operations, keeps so many of them in flight, and then emits results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)