Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C18321779F for ; Wed, 15 Apr 2015 09:38:42 +0000 (UTC) Received: (qmail 35193 invoked by uid 500); 15 Apr 2015 09:38:42 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 35073 invoked by uid 500); 15 Apr 2015 09:38:42 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 34836 invoked by uid 99); 15 Apr 2015 09:38:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Apr 2015 09:38:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50D91E0A1E; Wed, 15 Apr 2015 09:38:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gyfora@apache.org To: commits@flink.apache.org Date: Wed, 15 Apr 2015 09:38:48 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/19] flink git commit: [streaming] Major internal renaming and restructure http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java deleted file mode 100644 index aa2a1bc..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.state; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.util.AbstractCollection; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import org.apache.commons.collections.BoundedCollection; -import org.apache.commons.collections.Buffer; -import org.apache.commons.collections.BufferUnderflowException; - -@SuppressWarnings("rawtypes") -public class NullableCircularBuffer extends AbstractCollection implements Buffer, - BoundedCollection, Serializable { - - /** Serialization version */ - private static final long serialVersionUID = 5603722811189451017L; - - /** Underlying storage array */ - private transient Object[] elements; - - /** Array index of first (oldest) buffer element */ - private transient int start = 0; - - /** - * Index mod maxElements of the array position following the last buffer - * element. Buffer elements start at elements[start] and "wrap around" - * elements[maxElements-1], ending at elements[decrement(end)]. For example, - * elements = {c,a,b}, start=1, end=1 corresponds to the buffer [a,b,c]. - */ - private transient int end = 0; - - /** Flag to indicate if the buffer is currently full. */ - private transient boolean full = false; - - /** Capacity of the buffer */ - private final int maxElements; - - /** - * Constructs a new BoundedFifoBuffer big enough to hold 32 - * elements. - */ - public NullableCircularBuffer() { - this(32); - } - - /** - * Constructs a new BoundedFifoBuffer big enough to hold the - * specified number of elements. - * - * @param size - * the maximum number of elements for this fifo - * @throws IllegalArgumentException - * if the size is less than 1 - */ - public NullableCircularBuffer(int size) { - if (size <= 0) { - throw new IllegalArgumentException("The size must be greater than 0"); - } - elements = new Object[size]; - maxElements = elements.length; - } - - /** - * Constructs a new BoundedFifoBuffer big enough to hold all of - * the elements in the specified collection. That collection's elements will - * also be added to the buffer. - * - * @param coll - * the collection whose elements to add, may not be null - * @throws NullPointerException - * if the collection is null - */ - @SuppressWarnings("unchecked") - public NullableCircularBuffer(Collection coll) { - this(coll.size()); - addAll(coll); - } - - // ----------------------------------------------------------------------- - /** - * Write the buffer out using a custom routine. - * - * @param out - * the output stream - * @throws IOException - */ - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - out.writeInt(size()); - for (Iterator it = iterator(); it.hasNext();) { - out.writeObject(it.next()); - } - } - - /** - * Read the buffer in using a custom routine. - * - * @param in - * the input stream - * @throws IOException - * @throws ClassNotFoundException - */ - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - elements = new Object[maxElements]; - int size = in.readInt(); - for (int i = 0; i < size; i++) { - elements[i] = in.readObject(); - } - start = 0; - full = (size == maxElements); - if (full) { - end = 0; - } else { - end = size; - } - } - - // ----------------------------------------------------------------------- - /** - * Returns the number of elements stored in the buffer. - * - * @return this buffer's size - */ - public int size() { - int size = 0; - - if (end < start) { - size = maxElements - start + end; - } else if (end == start) { - size = (full ? maxElements : 0); - } else { - size = end - start; - } - - return size; - } - - /** - * Returns true if this buffer is empty; false otherwise. - * - * @return true if this buffer is empty - */ - public boolean isEmpty() { - return size() == 0; - } - - /** - * Returns true if this collection is full and no new elements can be added. - * - * @return true if the collection is full - */ - public boolean isFull() { - return size() == maxElements; - } - - /** - * Gets the maximum size of the collection (the bound). - * - * @return the maximum number of elements the collection can hold - */ - public int maxSize() { - return maxElements; - } - - /** - * Clears this buffer. - */ - public void clear() { - full = false; - start = 0; - end = 0; - Arrays.fill(elements, null); - } - - /** - * Adds the given element to this buffer. - * - * @param element - * the element to add - * @return true, always - */ - public boolean add(Object element) { - - if (isFull()) { - remove(); - } - - elements[end++] = element; - - if (end >= maxElements) { - end = 0; - } - - if (end == start) { - full = true; - } - - return true; - } - - /** - * Returns the least recently inserted element in this buffer. - * - * @return the least recently inserted element - * @throws BufferUnderflowException - * if the buffer is empty - */ - public Object get() { - if (isEmpty()) { - throw new BufferUnderflowException("The buffer is already empty"); - } - - return elements[start]; - } - - /** - * Removes the least recently inserted element from this buffer. - * - * @return the least recently inserted element - * @throws BufferUnderflowException - * if the buffer is empty - */ - public Object remove() { - if (isEmpty()) { - throw new BufferUnderflowException("The buffer is already empty"); - } - - Object element = elements[start]; - - elements[start++] = null; - - if (start >= maxElements) { - start = 0; - } - - full = false; - - return element; - } - - /** - * Increments the internal index. - * - * @param index - * the index to increment - * @return the updated index - */ - private int increment(int index) { - index++; - if (index >= maxElements) { - index = 0; - } - return index; - } - - /** - * Decrements the internal index. - * - * @param index - * the index to decrement - * @return the updated index - */ - private int decrement(int index) { - index--; - if (index < 0) { - index = maxElements - 1; - } - return index; - } - - /** - * Returns an iterator over this buffer's elements. - * - * @return an iterator over this buffer's elements - */ - public Iterator iterator() { - return new Iterator() { - - private int index = start; - private int lastReturnedIndex = -1; - private boolean isFirst = full; - - public boolean hasNext() { - return isFirst || (index != end); - - } - - public Object next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - isFirst = false; - lastReturnedIndex = index; - index = increment(index); - return elements[lastReturnedIndex]; - } - - public void remove() { - if (lastReturnedIndex == -1) { - throw new IllegalStateException(); - } - - // First element can be removed quickly - if (lastReturnedIndex == start) { - NullableCircularBuffer.this.remove(); - lastReturnedIndex = -1; - return; - } - - int pos = lastReturnedIndex + 1; - if (start < lastReturnedIndex && pos < end) { - // shift in one part - System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos); - } else { - // Other elements require us to shift the subsequent - // elements - while (pos != end) { - if (pos >= maxElements) { - elements[pos - 1] = elements[0]; - pos = 0; - } else { - elements[decrement(pos)] = elements[pos]; - pos = increment(pos); - } - } - } - - lastReturnedIndex = -1; - end = decrement(end); - elements[end] = null; - full = false; - index = decrement(index); - } - - }; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java deleted file mode 100644 index 1c67c9e..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.state; - -import org.apache.flink.runtime.state.OperatorState; - -/** - * Base class for representing operator states that can be repartitioned for - * state state and load balancing. - * - * @param - * The type of the operator state. - */ -public abstract class PartitionableState extends OperatorState { - - private static final long serialVersionUID = 1L; - - PartitionableState(T initialState) { - super(initialState); - } - - /** - * Repartitions(divides) the current state into the given number of new - * partitions. The created partitions will be used to redistribute then - * rebuild the state among the parallel instances of the operator. The - * implementation should reflect the partitioning of the input values to - * maintain correct operator behavior. - * - *

It is also assumed that if we would {@link #reBuild} the - * repartitioned state we would basically get the same as before. - * - * - * @param numberOfPartitions - * The desired number of partitions. The method must return an - * array of that size. - * @return The array containing the state part for each partition. - */ - public abstract OperatorState[] repartition(int numberOfPartitions); - - /** - * Rebuilds the current state partition from the given parts. Used for - * building the state after a re-balance phase. - * - * @param parts - * The state parts that will be used to rebuild the current - * partition. - * @return The rebuilt operator state. - */ - public abstract OperatorState reBuild(OperatorState... parts); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index 54a7692..f489334 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -25,11 +25,11 @@ import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType; -import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; -import org.apache.flink.streaming.api.function.aggregation.SumAggregator; -import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable; -import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable; +import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; +import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; +import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType; +import org.apache.flink.streaming.api.operators.StreamGroupedReduce; +import org.apache.flink.streaming.api.operators.StreamReduce; import org.apache.flink.streaming.util.MockContext; import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.junit.Test; @@ -107,13 +107,13 @@ public class AggregationFunctionTest { ReduceFunction maxFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MAX); List> sumList = MockContext.createAndExecute( - new StreamReduceInvokable>(sumFunction), getInputList()); + new StreamReduce>(sumFunction), getInputList()); List> minList = MockContext.createAndExecute( - new StreamReduceInvokable>(minFunction), getInputList()); + new StreamReduce>(minFunction), getInputList()); List> maxList = MockContext.createAndExecute( - new StreamReduceInvokable>(maxFunction), getInputList()); + new StreamReduce>(maxFunction), getInputList()); TypeInformation> typeInfo = TypeExtractor .getForObject(new Tuple2(1, 1)); @@ -123,15 +123,15 @@ public class AggregationFunctionTest { typeInfo, new ExecutionConfig()); List> groupedSumList = MockContext.createAndExecute( - new GroupedReduceInvokable>(sumFunction, keySelector), + new StreamGroupedReduce>(sumFunction, keySelector), getInputList()); List> groupedMinList = MockContext.createAndExecute( - new GroupedReduceInvokable>(minFunction, keySelector), + new StreamGroupedReduce>(minFunction, keySelector), getInputList()); List> groupedMaxList = MockContext.createAndExecute( - new GroupedReduceInvokable>(maxFunction, keySelector), + new StreamGroupedReduce>(maxFunction, keySelector), getInputList()); assertEquals(expectedSumList, sumList); @@ -141,11 +141,11 @@ public class AggregationFunctionTest { assertEquals(expectedGroupMinList, groupedMinList); assertEquals(expectedGroupMaxList, groupedMaxList); assertEquals(expectedSumList0, MockContext.createAndExecute( - new StreamReduceInvokable(sumFunction0), simpleInput)); + new StreamReduce(sumFunction0), simpleInput)); assertEquals(expectedMinList0, MockContext.createAndExecute( - new StreamReduceInvokable(minFunction0), simpleInput)); + new StreamReduce(minFunction0), simpleInput)); assertEquals(expectedMaxList0, MockContext.createAndExecute( - new StreamReduceInvokable(maxFunction0), simpleInput)); + new StreamReduce(maxFunction0), simpleInput)); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); try { @@ -229,11 +229,11 @@ public class AggregationFunctionTest { ReduceFunction maxFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MAX); List sumList = MockContext.createAndExecute( - new StreamReduceInvokable(sumFunction), getInputPojoList()); + new StreamReduce(sumFunction), getInputPojoList()); List minList = MockContext.createAndExecute( - new StreamReduceInvokable(minFunction), getInputPojoList()); + new StreamReduce(minFunction), getInputPojoList()); List maxList = MockContext.createAndExecute( - new StreamReduceInvokable(maxFunction), getInputPojoList()); + new StreamReduce(maxFunction), getInputPojoList()); TypeInformation typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1)); KeySelector keySelector = KeySelectorUtil.getSelectorForKeys( @@ -241,13 +241,13 @@ public class AggregationFunctionTest { typeInfo, config); List groupedSumList = MockContext.createAndExecute( - new GroupedReduceInvokable(sumFunction, keySelector), + new StreamGroupedReduce(sumFunction, keySelector), getInputPojoList()); List groupedMinList = MockContext.createAndExecute( - new GroupedReduceInvokable(minFunction, keySelector), + new StreamGroupedReduce(minFunction, keySelector), getInputPojoList()); List groupedMaxList = MockContext.createAndExecute( - new GroupedReduceInvokable(maxFunction, keySelector), + new StreamGroupedReduce(maxFunction, keySelector), getInputPojoList()); assertEquals(expectedSumList, sumList); @@ -257,11 +257,11 @@ public class AggregationFunctionTest { assertEquals(expectedGroupMinList, groupedMinList); assertEquals(expectedGroupMaxList, groupedMaxList); assertEquals(expectedSumList0, MockContext.createAndExecute( - new StreamReduceInvokable(sumFunction0), simpleInput)); + new StreamReduce(sumFunction0), simpleInput)); assertEquals(expectedMinList0, MockContext.createAndExecute( - new StreamReduceInvokable(minFunction0), simpleInput)); + new StreamReduce(minFunction0), simpleInput)); assertEquals(expectedMaxList0, MockContext.createAndExecute( - new StreamReduceInvokable(maxFunction0), simpleInput)); + new StreamReduce(maxFunction0), simpleInput)); } @Test @@ -324,16 +324,16 @@ public class AggregationFunctionTest { minByLastExpected.add(new Tuple2(0, 6)); assertEquals(maxByFirstExpected, MockContext.createAndExecute( - new StreamReduceInvokable>(maxByFunctionFirst), + new StreamReduce>(maxByFunctionFirst), getInputList())); assertEquals(maxByLastExpected, MockContext.createAndExecute( - new StreamReduceInvokable>(maxByFunctionLast), + new StreamReduce>(maxByFunctionLast), getInputList())); assertEquals(minByLastExpected, MockContext.createAndExecute( - new StreamReduceInvokable>(minByFunctionLast), + new StreamReduce>(minByFunctionLast), getInputList())); assertEquals(minByFirstExpected, MockContext.createAndExecute( - new StreamReduceInvokable>(minByFunctionFirst), + new StreamReduce>(minByFunctionFirst), getInputList())); } @@ -398,16 +398,16 @@ public class AggregationFunctionTest { minByLastExpected.add(new MyPojo(0, 6)); assertEquals(maxByFirstExpected, MockContext.createAndExecute( - new StreamReduceInvokable(maxByFunctionFirst), + new StreamReduce(maxByFunctionFirst), getInputPojoList())); assertEquals(maxByLastExpected, MockContext.createAndExecute( - new StreamReduceInvokable(maxByFunctionLast), + new StreamReduce(maxByFunctionLast), getInputPojoList())); assertEquals(minByLastExpected, MockContext.createAndExecute( - new StreamReduceInvokable(minByFunctionLast), + new StreamReduce(minByFunctionLast), getInputPojoList())); assertEquals(minByFirstExpected, MockContext.createAndExecute( - new StreamReduceInvokable(minByFunctionFirst), + new StreamReduce(minByFunctionFirst), getInputPojoList())); } http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java index f527de4..4228314 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java @@ -30,8 +30,8 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.co.CoFlatMapFunction; -import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; +import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.util.TestListResultSink; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.util.Collector; @@ -89,7 +89,7 @@ public class CoStreamTest { public boolean filter(Tuple2 value) throws Exception { return true; } - }).setChainingStrategy(StreamInvokable.ChainingStrategy.NEVER).groupBy(new KeySelector, Integer>() { + }).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER).groupBy(new KeySelector, Integer>() { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java index 31bd147..39c3f31 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.util.Collector; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java index 792c5d2..e72f2d9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java @@ -27,8 +27,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.streaming.api.function.source.FromElementsFunction; -import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction; +import org.apache.flink.streaming.api.functions.source.FromElementsFunction; +import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; import org.apache.flink.streaming.util.MockCollector; import org.apache.flink.streaming.util.MockSource; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index 9a19dde..f163b9e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -28,10 +28,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.co.CoFlatMapFunction; -import org.apache.flink.streaming.api.function.co.CoMapFunction; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.function.co.CoWindowFunction; +import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.co.CoReduceFunction; +import org.apache.flink.streaming.api.functions.co.CoWindowFunction; import org.apache.flink.util.Collector; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java index bc7fe73..fc3e36f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java @@ -29,7 +29,7 @@ import java.util.Map; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.util.TestListResultSink; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java index 49b3bf8..32da578 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java @@ -21,8 +21,8 @@ import static org.junit.Assert.assertArrayEquals; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.api.streamvertex.MockRecordWriter; +import org.apache.flink.streaming.api.streamtask.MockRecordWriter; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.MockRecordWriterFactory; import org.apache.flink.util.Collector; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java deleted file mode 100644 index 969a06b..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.streaming.util.MockContext; -import org.junit.Test; - -public class CounterInvokableTest { - - @Test - public void counterTest() { - CounterInvokable invokable = new CounterInvokable(); - - List expected = Arrays.asList(1L, 2L, 3L); - List actual = MockContext.createAndExecute(invokable, Arrays.asList("one", "two", "three")); - - assertEquals(expected, actual); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java deleted file mode 100644 index 403dd17..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.streaming.util.MockContext; -import org.junit.Test; - -public class FilterTest implements Serializable { - private static final long serialVersionUID = 1L; - - static class MyFilter implements FilterFunction { - private static final long serialVersionUID = 1L; - - @Override - public boolean filter(Integer value) throws Exception { - return value % 2 == 0; - } - } - - @Test - public void test() { - FilterInvokable invokable = new FilterInvokable(new MyFilter()); - - List expected = Arrays.asList(2, 4, 6); - List actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7)); - - assertEquals(expected, actual); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java deleted file mode 100644 index 7424e21..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.streaming.util.MockContext; -import org.apache.flink.util.Collector; -import org.junit.Test; - -public class FlatMapTest { - - public static final class MyFlatMap implements FlatMapFunction { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(Integer value, Collector out) throws Exception { - if (value % 2 == 0) { - out.collect(value); - out.collect(value * value); - } - } - } - - @Test - public void flatMapTest() { - FlatMapInvokable invokable = new FlatMapInvokable(new MyFlatMap()); - - List expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64); - List actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)); - - assertEquals(expected, actual); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java deleted file mode 100644 index 01375d2..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.util.MockContext; - -import org.junit.Test; - -public class GroupedFoldInvokableTest { - - private static class MyFolder implements FoldFunction { - - private static final long serialVersionUID = 1L; - - @Override - public String fold(String accumulator, Integer value) throws Exception { - return accumulator + value.toString(); - } - - } - - @Test - public void test() { - TypeInformation outType = TypeExtractor.getForObject("A string"); - - GroupedFoldInvokable invokable1 = new GroupedFoldInvokable( - new MyFolder(), new KeySelector() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Integer value) throws Exception { - return value.toString(); - } - }, "100", outType); - - List expected = Arrays.asList("1001","10011", "1002", "10022", "1003"); - List actual = MockContext.createAndExecute(invokable1, - Arrays.asList(1, 1, 2, 2, 3)); - - assertEquals(expected, actual); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java deleted file mode 100644 index ce47c67..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.util.MockContext; -import org.junit.Test; - -public class GroupedReduceInvokableTest { - - private static class MyReducer implements ReduceFunction { - - private static final long serialVersionUID = 1L; - - @Override - public Integer reduce(Integer value1, Integer value2) throws Exception { - return value1 + value2; - } - - } - - @Test - public void test() { - GroupedReduceInvokable invokable1 = new GroupedReduceInvokable( - new MyReducer(), new KeySelector() { - - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }); - - List expected = Arrays.asList(1, 2, 2, 4, 3); - List actual = MockContext.createAndExecute(invokable1, - Arrays.asList(1, 1, 2, 2, 3)); - - assertEquals(expected, actual); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java deleted file mode 100644 index 5390ec9..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.util.MockContext; -import org.junit.Test; - -public class MapTest { - - private static class Map implements MapFunction { - private static final long serialVersionUID = 1L; - - @Override - public String map(Integer value) throws Exception { - return "+" + (value + 1); - } - } - - @Test - public void mapInvokableTest() { - MapInvokable invokable = new MapInvokable(new Map()); - - List expectedList = Arrays.asList("+2", "+3", "+4"); - List actualList = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3)); - - assertEquals(expectedList, actualList); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java deleted file mode 100644 index 11c44cd..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.datastream.StreamProjection; -import org.apache.flink.streaming.util.MockContext; -import org.junit.Test; - -public class ProjectTest implements Serializable { - private static final long serialVersionUID = 1L; - - @Test - public void test() { - - TypeInformation> inType = TypeExtractor - .getForObject(new Tuple5(2, "a", 3, "b", - 4)); - - int[] fields = new int[] { 4, 4, 3 }; - Class[] classes = new Class[] { Integer.class, Integer.class, String.class }; - - @SuppressWarnings("unchecked") - ProjectInvokable, Tuple3> invokable = new ProjectInvokable, Tuple3>( - fields, - (TypeInformation>) StreamProjection - .extractFieldTypes(fields, classes, inType)); - - List> input = new ArrayList>(); - input.add(new Tuple5(2, "a", 3, "b", 4)); - input.add(new Tuple5(2, "s", 3, "c", 2)); - input.add(new Tuple5(2, "a", 3, "c", 2)); - input.add(new Tuple5(2, "a", 3, "a", 7)); - - List> expected = new ArrayList>(); - expected.add(new Tuple3(4, 4, "b")); - expected.add(new Tuple3(2, 2, "c")); - expected.add(new Tuple3(2, 2, "c")); - expected.add(new Tuple3(7, 7, "a")); - - assertEquals(expected, MockContext.createAndExecute(invokable, input)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java deleted file mode 100644 index 90a133b..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.util.MockContext; -import org.junit.Test; - -public class StreamFoldTest { - - private static class MyFolder implements FoldFunction { - - private static final long serialVersionUID = 1L; - - @Override - public String fold(String accumulator, Integer value) throws Exception { - return accumulator + value.toString(); - } - } - - @Test - public void test() { - TypeInformation outType = TypeExtractor.getForObject("A string"); - StreamFoldInvokable invokable1 = new StreamFoldInvokable( - new MyFolder(), "", outType); - - List expected = Arrays.asList("1","11","112","1123","11233"); - List actual = MockContext.createAndExecute(invokable1, - Arrays.asList(1, 1, 2, 3, 3)); - - assertEquals(expected, actual); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java deleted file mode 100644 index ae866e6..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.util.MockContext; -import org.junit.Test; - -public class StreamReduceTest { - - private static class MyReducer implements ReduceFunction{ - - private static final long serialVersionUID = 1L; - - @Override - public Integer reduce(Integer value1, Integer value2) throws Exception { - return value1+value2; - } - - } - - @Test - public void test() { - StreamReduceInvokable invokable1 = new StreamReduceInvokable( - new MyReducer()); - - List expected = Arrays.asList(1,2,4,7,10); - List actual = MockContext.createAndExecute(invokable1, - Arrays.asList(1, 1, 2, 3, 3)); - - assertEquals(expected, actual); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java deleted file mode 100644 index 195e67c..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.co.CoFlatMapFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable; -import org.apache.flink.streaming.util.MockCoContext; -import org.apache.flink.util.Collector; -import org.junit.Test; - -public class CoFlatMapTest implements Serializable { - private static final long serialVersionUID = 1L; - - private final static class MyCoFlatMap implements CoFlatMapFunction { - private static final long serialVersionUID = 1L; - - @Override - public void flatMap1(String value, Collector coll) { - for (int i = 0; i < value.length(); i++) { - coll.collect(value.substring(i, i + 1)); - } - } - - @Override - public void flatMap2(Integer value, Collector coll) { - coll.collect(value.toString()); - } - } - - @Test - public void coFlatMapTest() { - CoFlatMapInvokable invokable = new CoFlatMapInvokable( - new MyCoFlatMap()); - - List expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h", - "e", "3", "4", "5"); - List actualList = MockCoContext.createAndExecute(invokable, - Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5)); - - assertEquals(expectedList, actualList); - } - - @SuppressWarnings("unchecked") - @Test - public void multipleInputTest() { - LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); - - DataStream ds1 = env.fromElements(1, 3, 5); - DataStream ds2 = env.fromElements(2, 4).merge(ds1); - - try { - ds1.forward().merge(ds2); - fail(); - } catch (RuntimeException e) { - // expected - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java deleted file mode 100644 index a531884..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoGroupedReduceTest { - - private final static class MyCoReduceFunction implements - CoReduceFunction, Tuple2, String> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple3 reduce1(Tuple3 value1, - Tuple3 value2) { - return new Tuple3(value1.f0, value1.f1 + value2.f1, value1.f2); - } - - @Override - public Tuple2 reduce2(Tuple2 value1, - Tuple2 value2) { - return new Tuple2(value1.f0, value1.f1 + value2.f1); - } - - @Override - public String map1(Tuple3 value) { - return value.f1; - } - - @Override - public String map2(Tuple2 value) { - return value.f1.toString(); - } - } - - @SuppressWarnings("unchecked") - @Test - public void coGroupedReduceTest() { - Tuple3 word1 = new Tuple3("a", "word1", "b"); - Tuple3 word2 = new Tuple3("b", "word2", "a"); - Tuple3 word3 = new Tuple3("a", "word3", "a"); - Tuple2 int1 = new Tuple2(2, 1); - Tuple2 int2 = new Tuple2(1, 2); - Tuple2 int3 = new Tuple2(0, 3); - Tuple2 int4 = new Tuple2(2, 4); - Tuple2 int5 = new Tuple2(1, 5); - - KeySelector, ?> keySelector0 = new KeySelector, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple3 value) throws Exception { - return value.f0; - } - }; - - KeySelector, ?> keySelector1 = new KeySelector, Integer>() { - - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(Tuple2 value) throws Exception { - return value.f0; - } - }; - - KeySelector, ?> keySelector2 = new KeySelector, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple3 value) throws Exception { - return value.f2; - } - }; - - CoGroupedReduceInvokable, Tuple2, String> invokable = new CoGroupedReduceInvokable, Tuple2, String>( - new MyCoReduceFunction(), keySelector0, keySelector1); - - List expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5", - "7"); - - List actualList = MockCoContext.createAndExecute(invokable, - Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5)); - - assertEquals(expected, actualList); - - invokable = new CoGroupedReduceInvokable, Tuple2, String>( - new MyCoReduceFunction(), keySelector2, keySelector1); - - expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7"); - - actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3), - Arrays.asList(int1, int2, int3, int4, int5)); - - assertEquals(expected, actualList); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java deleted file mode 100644 index 6b84440..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import static org.junit.Assert.assertEquals; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.streaming.api.function.co.CoMapFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoMapTest implements Serializable { - private static final long serialVersionUID = 1L; - - private final static class MyCoMap implements CoMapFunction { - private static final long serialVersionUID = 1L; - - @Override - public String map1(Double value) { - return value.toString(); - } - - @Override - public String map2(Integer value) { - return value.toString(); - } - } - - @Test - public void coMapTest() { - CoMapInvokable invokable = new CoMapInvokable(new MyCoMap()); - - List expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5"); - List actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3)); - - assertEquals(expectedList, actualList); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java deleted file mode 100644 index 7f23fba..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoStreamReduceTest { - - public static class MyCoReduceFunction implements - CoReduceFunction { - private static final long serialVersionUID = 1L; - - @Override - public Integer reduce1(Integer value1, Integer value2) { - return value1 * value2; - } - - @Override - public String reduce2(String value1, String value2) { - return value1 + value2; - } - - @Override - public Integer map1(Integer value) { - return value; - } - - @Override - public Integer map2(String value) { - return Integer.parseInt(value); - } - - } - - @Test - public void coStreamReduceTest() { - - CoReduceInvokable coReduce = new CoReduceInvokable( - new MyCoReduceFunction()); - - List expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24); - List result = MockCoContext.createAndExecute(coReduce, - Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8")); - - assertEquals(expected1, result); - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java deleted file mode 100644 index a4f6120..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.function.co.CoWindowFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; -import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.apache.flink.streaming.util.MockCoContext; -import org.apache.flink.util.Collector; -import org.junit.Test; - -public class CoWindowTest { - - public static final class MyCoGroup1 implements CoWindowFunction { - - private static final long serialVersionUID = 1L; - - @SuppressWarnings("unused") - @Override - public void coWindow(List first, List second, Collector out) - throws Exception { - Integer count1 = 0; - for (Integer i : first) { - count1++; - } - Integer count2 = 0; - for (Integer i : second) { - count2++; - } - out.collect(count1); - out.collect(count2); - - } - - } - - public static final class MyCoGroup2 implements - CoWindowFunction, Tuple2, Integer> { - - private static final long serialVersionUID = 1L; - - @Override - public void coWindow(List> first, - List> second, Collector out) throws Exception { - - Set firstElements = new HashSet(); - for (Tuple2 value : first) { - firstElements.add(value.f1); - } - for (Tuple2 value : second) { - if (firstElements.contains(value.f1)) { - out.collect(value.f1); - } - } - - } - - } - - private static final class MyTS1 implements Timestamp { - - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(Integer value) { - return value; - } - - } - - private static final class MyTS2 implements Timestamp> { - - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(Tuple2 value) { - return value.f0; - } - - } - - @Test - public void coWindowGroupReduceTest2() throws Exception { - - CoWindowInvokable invokable1 = new CoWindowInvokable( - new MyCoGroup1(), 2, 1, new TimestampWrapper(new MyTS1(), 1), - new TimestampWrapper(new MyTS1(), 1)); - - // Windowsize 2, slide 1 - // 1,2|2,3|3,4|4,5 - - List input11 = new ArrayList(); - input11.add(1); - input11.add(1); - input11.add(2); - input11.add(3); - input11.add(3); - - List input12 = new ArrayList(); - input12.add(1); - input12.add(2); - input12.add(3); - input12.add(3); - input12.add(5); - - // Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5) - // expected output: 3,2|3,3|2,2|0,1 - - List expected1 = new ArrayList(); - expected1.add(3); - expected1.add(2); - expected1.add(3); - expected1.add(3); - expected1.add(2); - expected1.add(2); - expected1.add(0); - expected1.add(1); - - List actual1 = MockCoContext.createAndExecute(invokable1, input11, input12); - assertEquals(expected1, actual1); - - CoWindowInvokable, Tuple2, Integer> invokable2 = new CoWindowInvokable, Tuple2, Integer>( - new MyCoGroup2(), 2, 3, new TimestampWrapper>(new MyTS2(), - 1), new TimestampWrapper>(new MyTS2(), 1)); - - // WindowSize 2, slide 3 - // 1,2|4,5|7,8| - - List> input21 = new ArrayList>(); - input21.add(new Tuple2(1, 1)); - input21.add(new Tuple2(1, 2)); - input21.add(new Tuple2(2, 3)); - input21.add(new Tuple2(3, 4)); - input21.add(new Tuple2(3, 5)); - input21.add(new Tuple2(4, 6)); - input21.add(new Tuple2(4, 7)); - input21.add(new Tuple2(5, 8)); - - List> input22 = new ArrayList>(); - input22.add(new Tuple2(1, 1)); - input22.add(new Tuple2(2, 0)); - input22.add(new Tuple2(2, 2)); - input22.add(new Tuple2(3, 9)); - input22.add(new Tuple2(3, 4)); - input22.add(new Tuple2(4, 10)); - input22.add(new Tuple2(5, 8)); - input22.add(new Tuple2(5, 7)); - - List expected2 = new ArrayList(); - expected2.add(1); - expected2.add(2); - expected2.add(8); - expected2.add(7); - - List actual2 = MockCoContext.createAndExecute(invokable2, input21, input22); - assertEquals(expected2, actual2); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java deleted file mode 100644 index b58245a..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.co.CoMapFunction; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.streaming.util.TestListResultSink; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.util.Collector; -import org.junit.Test; - -public class SelfConnectionTest implements Serializable { - - private static final long serialVersionUID = 1L; - - private final int MEMORY_SIZE = 32; - - private static List expected; - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void sameDataStreamTest() { - - StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE); - - TestListResultSink resultSink = new TestListResultSink(); - - Timestamp timeStamp = new Timestamp() { - - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(Integer value) { - return value; - } - - }; - - KeySelector keySelector = new KeySelector() { - - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }; - - DataStream src = env.fromElements(1, 3, 5); - - @SuppressWarnings("unused") - DataStream> dataStream = - src.join(src).onWindow(50L, timeStamp, timeStamp).where(keySelector).equalTo(keySelector) - .map(new MapFunction, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String map(Tuple2 value) throws Exception { - return value.toString(); - } - }) - .addSink(resultSink); - - - try { - env.execute(); - - expected = new ArrayList(); - - expected.addAll(Arrays.asList("(1,1)", "(3,3)", "(5,5)")); - - List result = resultSink.getResult(); - - Collections.sort(expected); - Collections.sort(result); - - assertEquals(expected, result); - } catch (Exception e) { - fail(); - e.printStackTrace(); - } - } - - /** - * We connect two different data streams in a chain to a CoMap. - */ - @Test - public void differentDataStreamSameChain() { - - TestListResultSink resultSink = new TestListResultSink(); - - StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE); - - DataStream src = env.fromElements(1, 3, 5); - - DataStream stringMap = src.map(new MapFunction() { - private static final long serialVersionUID = 1L; - - @Override - public String map(Integer value) throws Exception { - return "x " + value; - } - }).setChainingStrategy(StreamInvokable.ChainingStrategy.ALWAYS); - - stringMap.connect(src).map(new CoMapFunction() { - - private static final long serialVersionUID = 1L; - - @Override - public String map1(String value) { - return value; - } - - @Override - public String map2(Integer value) { - return String.valueOf(value + 1); - } - }).addSink(resultSink); - - try { - env.execute(); - } catch (Exception e) { - e.printStackTrace(); - } - - expected = new ArrayList(); - - expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6")); - - List result = resultSink.getResult(); - - Collections.sort(expected); - Collections.sort(result); - - assertEquals(expected, result); - } - - /** - * We connect two different data streams in different chains to a CoMap. - * (This is not actually self-connect.) - */ - @Test - public void differentDataStreamDifferentChain() { - - TestListResultSink resultSink = new TestListResultSink(); - - StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE); - - DataStream src = env.fromElements(1, 3, 5).setChainingStrategy(StreamInvokable.ChainingStrategy.NEVER); - - DataStream stringMap = src.flatMap(new FlatMapFunction() { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(Integer value, Collector out) throws Exception { - out.collect("x " + value); - } - }).groupBy(new KeySelector() { - - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(String value) throws Exception { - return value.length(); - } - }); - - DataStream longMap = src.map(new MapFunction() { - - private static final long serialVersionUID = 1L; - - @Override - public Long map(Integer value) throws Exception { - return Long.valueOf(value + 1); - } - }).groupBy(new KeySelector() { - - private static final long serialVersionUID = 1L; - - @Override - public Long getKey(Long value) throws Exception { - return value; - } - }); - - - stringMap.connect(longMap).map(new CoMapFunction() { - - private static final long serialVersionUID = 1L; - - @Override - public String map1(String value) { - return value; - } - - @Override - public String map2(Long value) { - return value.toString(); - } - }).addSink(resultSink); - - try { - env.execute(); - } catch (Exception e) { - e.printStackTrace(); - } - - expected = new ArrayList(); - - expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6")); - - List result = resultSink.getResult(); - - Collections.sort(expected); - Collections.sort(result); - - assertEquals(expected, result); - } -}