flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
Date Thu, 03 Nov 2016 15:31:23 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633160#comment-15633160
] 

ASF GitHub Bot commented on FLINK-4391:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2629#discussion_r86347448
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
---
    @@ -0,0 +1,293 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators.async;
    +
    +import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    +import org.apache.flink.streaming.api.functions.async.AsyncFunction;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.Output;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import org.mockito.internal.util.reflection.Whitebox;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +/**
    + * Tests for {@link AsyncCollectorBuffer}. These test that:
    + *
    + * <ul>
    + *     <li>Add a new item into the buffer</li>
    + *     <li>Ordered mode processing</li>
    + *     <li>Unordered mode processing</li>
    + *     <li>Error handling</li>
    + * </ul>
    + */
    +public class AsyncCollectorBufferTest {
    +	private AsyncFunction<Integer, Integer> function;
    +
    +	private AsyncWaitOperator<Integer, Integer> operator;
    +
    +	private AsyncCollectorBuffer<Integer, Integer> buffer;
    +
    +	private Output<StreamRecord<Integer>> output;
    +
    +	@Before
    +	public void setUp() throws Exception {
    +		function = new AsyncFunction<Integer, Integer>() {
    +			@Override
    +			public void asyncInvoke(Integer input, AsyncCollector<Integer, Integer> collector)
throws Exception {
    +
    +			}
    +		};
    +
    +		operator = new AsyncWaitOperator<>(function);
    +		Class<?>[] classes = AbstractStreamOperator.class.getDeclaredClasses();
    +		Class<?> latencyClass = null;
    +		for (Class<?> c : classes) {
    +			if (c.getName().indexOf("LatencyGauge") != -1) {
    +				latencyClass = c;
    +			}
    +		}
    +
    +		Constructor<?> explicitConstructor = latencyClass.getDeclaredConstructors()[0];
    +		explicitConstructor.setAccessible(true);
    +		Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10));
    +
    +		output = new FakedOutput(new ArrayList<Long>());
    +		TimestampedCollector<Integer> collector =new TimestampedCollector(output);
    +		buffer =
    +			new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.ORDERED, operator);
    +		buffer.setOutput(collector, output);
    +
    +		Whitebox.setInternalState(operator, "output", output);
    +	}
    +
    +	@Test
    +	public void testAdd() throws Exception {
    +		Thread.sleep(1000);
    +		buffer.add(new Watermark(0l));
    +		buffer.add(new LatencyMarker(111L, 1, 1));
    +		Assert.assertEquals(((SimpleLinkedList) Whitebox.getInternalState(buffer, "queue")).size(),
2);
    +		Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, "collectorToStreamElement")).size(),
2);
    +		Assert.assertEquals(((Map) Whitebox.getInternalState(buffer, "collectorToQueue")).size(),
2);
    +
    +		AsyncCollector collector = (AsyncCollector)((SimpleLinkedList) Whitebox.getInternalState(buffer,
"queue")).get(0);
    +		Watermark ret = ((StreamElement)((Map) Whitebox.getInternalState(buffer, "collectorToStreamElement")).get(collector)).asWatermark();
    +		Assert.assertEquals(ret.getTimestamp(), 0l);
    +
    +		AsyncCollector collector2 = (AsyncCollector)((SimpleLinkedList) Whitebox.getInternalState(buffer,
"queue")).get(1);
    +		LatencyMarker latencyMarker = ((StreamElement)((Map) Whitebox.getInternalState(buffer,
"collectorToStreamElement")).get(collector2)).asLatencyMarker();
    +		Assert.assertEquals(latencyMarker.getMarkedTime(), 111l);
    +
    +		SimpleLinkedList list = (SimpleLinkedList) Whitebox.getInternalState(buffer, "queue");
    +		Assert.assertEquals(list.node(0), ((Map) Whitebox.getInternalState(buffer, "collectorToQueue")).get(collector));
    +
    +		List<StreamElement> elements = buffer.getStreamElementsInBuffer();
    +		Assert.assertEquals(elements.size(), 2);
    +	}
    +
    +	public class OrderedPutThread extends Thread {
    +		final int ASYNC_COLLECTOR_NUM = 7;
    +		int[] orderedSeq = new int[] {0, 1, 2, 3, 4, 5, 6};
    +		int[] sleepTimeArr = new int[] {5, 7, 3, 0, 1, 9, 9};
    +
    +		AsyncCollectorBuffer<Integer, Integer> buffer;
    +		ExecutorService service = Executors.newFixedThreadPool(10);
    +
    +		boolean throwExcept = false;
    +		boolean orderedMode = false;
    +
    +		public OrderedPutThread(AsyncCollectorBuffer buffer, boolean except, boolean orderedMode)
{
    +			this.buffer = buffer;
    +			this.throwExcept = except;
    +			this.orderedMode = orderedMode;
    +		}
    +
    +		public OrderedPutThread(AsyncCollectorBuffer buffer, boolean except) {
    +			this(buffer, except, true);
    +		}
    +
    +		public OrderedPutThread(AsyncCollectorBuffer buffer) {
    +			this(buffer, false);
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				for (int idx = 0; idx < ASYNC_COLLECTOR_NUM; ++idx) {
    +					int i = orderedSeq[idx];
    +					final int sleepTS = sleepTimeArr[idx]*1000;
    +
    +					if (i == 3)
    +						buffer.add(new Watermark(333l));
    +					else if (i == 6)
    +						buffer.add(new LatencyMarker(111L, 0, 0));
    +					else {
    +						StreamRecord record = new StreamRecord(i);
    +						record.setTimestamp(i*i);
    +						final AsyncCollector collector = buffer.add(record);
    +
    +						final int v = i;
    +						service.submit(new Runnable() {
    +							@Override
    +							public void run() {
    +								try {
    +									int sleep = sleepTS;
    +									Thread.sleep(sleep);
    +									if (throwExcept)
    +										collector.collect(new Exception("wahahaha..."));
    +									else {
    +										List<Integer> ret = new ArrayList<Integer>();
    +										ret.add(v);
    +										collector.collect(ret);
    +									}
    +								} catch (Exception e) {
    +									e.printStackTrace();
    --- End diff --
    
    Shouldn't we and the test fail in case of an `Exception`?


> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
>                 Key: FLINK-4391
>                 URL: https://issues.apache.org/jira/browse/FLINK-4391
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Jamie Grier
>            Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a DataStream.
 The classic example would be joining against an external database in order to enrich a stream
with extra information.
> It would be nice to add general support for this type of operation in the Flink API.
 Ideally this could simply take the form of a new operator that manages async operations,
keeps so many of them in flight, and then emits results to downstream operators as the async
operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message