flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl0u <...@git.apache.org>
Subject [GitHub] flink pull request #5342: [FLINK-8470] Timebounded stream join
Date Wed, 24 Jan 2018 10:28:43 GMT
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5342#discussion_r163506497
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperatorTest.java
---
    @@ -0,0 +1,590 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.state.MapState;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
    +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +import org.junit.runners.Parameterized.Parameters;
    +
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.stream.Collectors;
    +
    +// TODO: Parameterize to use different state backends --> This would require circular
dependency on flink rocksdb
    +@RunWith(Parameterized.class)
    +public class TimeBoundedStreamJoinOperatorTest {
    +
    +	private final boolean lhsFasterThanRhs;
    +
    +	@Parameters(name = "lhs faster than rhs stream: {0}")
    +	public static Boolean[] data() {
    +		return new Boolean[]{true, false};
    +	}
    +
    +	public TimeBoundedStreamJoinOperatorTest(boolean lhsFasterThanRhs) {
    +		this.lhsFasterThanRhs = lhsFasterThanRhs;
    +	}
    +
    +	@Test // lhs - 2 <= rhs <= rhs + 2
    +	public void testNegativeInclusiveAndNegativeInclusive() throws Exception {
    +
    +		long lowerBound = -2;
    +		boolean lowerBoundInclusive = true;
    +
    +		long upperBound = -1;
    +		boolean upperBoundInclusive = true;
    +
    +		try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem,
TestElem>> testHarness
    +				 = createTestHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive))
{
    +
    +
    +			testHarness.setup();
    +			testHarness.open();
    +
    +			prepareTestHarness(testHarness);
    +
    +			List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput =
Lists.newArrayList(
    +				streamRecordOf(2, 1),
    +				streamRecordOf(3, 1),
    +				streamRecordOf(3, 2),
    +				streamRecordOf(4, 2),
    +				streamRecordOf(4, 3)
    +			);
    +
    +			assertOutput(expectedOutput, testHarness.getOutput());
    +			ensureNoLateData(testHarness.getOutput());
    +		}
    +	}
    +
    +	@Test // lhs - 1 <= rhs <= rhs + 1
    +	public void testNegativeInclusiveAndPositiveInclusive() throws Exception {
    +
    +		long lowerBound = -1;
    +		boolean lowerBoundInclusive = true;
    +
    +		long upperBound = 1;
    +		boolean upperBoundInclusive = true;
    +
    +		try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem,
TestElem>> testHarness
    +				 = createTestHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive))
{
    +
    +			testHarness.setup();
    +			testHarness.open();
    +
    +			prepareTestHarness(testHarness);
    +
    +			List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput =
Lists.newArrayList(
    +				streamRecordOf(1, 1),
    +				streamRecordOf(1, 2),
    +				streamRecordOf(2, 1),
    +				streamRecordOf(2, 2),
    +				streamRecordOf(2, 3),
    +				streamRecordOf(3, 2),
    +				streamRecordOf(3, 3),
    +				streamRecordOf(3, 4),
    +				streamRecordOf(4, 3),
    +				streamRecordOf(4, 4)
    +			);
    +
    +			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
    +
    +			assertOutput(expectedOutput, testHarness.getOutput());
    +			ensureNoLateData(output);
    +
    +		}
    +	}
    +
    +	@Test // lhs + 1 <= rhs <= lhs + 2
    +	public void testPositiveInclusiveAndPositiveInclusive() throws Exception {
    +		long lowerBound = 1;
    +		long upperBound = 2;
    +
    +		boolean lowerBoundInclusive = true;
    +		boolean upperBoundInclusive = true;
    +
    +		try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem,
TestElem>> testHarness
    +				 = createTestHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive))
{
    +
    +			testHarness.setup();
    +			testHarness.open();
    +
    +			prepareTestHarness(testHarness);
    +
    +			List<StreamRecord<Tuple2<TestElem, TestElem>>> expected = Lists.newArrayList(
    +				streamRecordOf(1, 2),
    +				streamRecordOf(1, 3),
    +				streamRecordOf(2, 3),
    +				streamRecordOf(2, 4),
    +				streamRecordOf(3, 4)
    +			);
    +
    +			assertOutput(expected, testHarness.getOutput());
    +			ensureNoLateData(testHarness.getOutput());
    +		}
    +	}
    +
    +	@Test
    +	public void testNegativeExclusiveAndNegativeExlusive() throws Exception {
    +		long lowerBound = -3;
    +		boolean lowerBoundInclusive = false;
    +
    +		long upperBound = -1;
    +		boolean upperBoundInclusive = false;
    +
    +		try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem,
TestElem>> testHarness
    +				 = createTestHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive))
{
    +
    +			testHarness.setup();
    +			testHarness.open();
    +			prepareTestHarness(testHarness);
    +
    +			List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput =
Lists.newArrayList(
    +				streamRecordOf(3, 1),
    +				streamRecordOf(4, 2)
    +			);
    +
    +			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
    +
    +			assertOutput(expectedOutput, testHarness.getOutput());
    +			ensureNoLateData(output);
    +		}
    +	}
    +
    +	@Test
    +	public void testNegativeExclusiveAndPositiveExlusive() throws Exception {
    +		long lowerBound = -1;
    +		boolean lowerBoundInclusive = false;
    +
    +		long upperBound = 1;
    +		boolean upperBoundInclusive = false;
    +
    +		try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem,
TestElem>> testHarness
    +				 = createTestHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive))
{
    +
    +			testHarness.setup();
    +			testHarness.open();
    +			prepareTestHarness(testHarness);
    +
    +			List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput =
Lists.newArrayList(
    +				streamRecordOf(1, 1),
    +				streamRecordOf(2, 2),
    +				streamRecordOf(3, 3),
    +				streamRecordOf(4, 4)
    +			);
    +
    +			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
    +
    +			assertOutput(expectedOutput, testHarness.getOutput());
    +			ensureNoLateData(output);
    +		}
    +	}
    +
    +	@Test
    +	public void testPositiveExclusiveAndPositiveExlusive() throws Exception {
    +		long lowerBound = 1;
    +		boolean lowerBoundInclusive = false;
    +
    +		long upperBound = 3;
    +		boolean upperBoundInclusive = false;
    +
    +		try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem,
TestElem>> testHarness
    +				 = createTestHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive))
{
    +
    +			testHarness.setup();
    +			testHarness.open();
    +			prepareTestHarness(testHarness);
    +
    +			List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput =
Lists.newArrayList(
    +				streamRecordOf(1, 3),
    +				streamRecordOf(2, 4)
    +			);
    +
    +			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
    +
    +			assertOutput(expectedOutput, testHarness.getOutput());
    +			ensureNoLateData(output);
    +
    +		}
    +	}
    +
    +	@Test
    +	public void stateGetsCleanedWhenNotNeeded() throws Exception {
    +
    +		long lowerBound = 1;
    +		boolean lowerBoundInclusive = true;
    +
    +		long upperBound = 2;
    +		boolean upperBoundInclusive = true;
    +
    +		TimeBoundedStreamJoinOperator<TestElem, TestElem> operator = new TimeBoundedStreamJoinOperator<>(
    +			lowerBound,
    --- End diff --
    
    Here, and wherever we instantiate an operator, we should now add the serializers for the
input elements. This can be done with the `TypeInformation.of(new TypeHint<TestElem>()
{}).createSerializer(new ExecutionConfig())`.


---

Mime
View raw message