Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2E753200C07 for ; Sun, 22 Jan 2017 23:12:16 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 29E4C160B45; Sun, 22 Jan 2017 22:12:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CDB23160B39 for ; Sun, 22 Jan 2017 23:12:14 +0100 (CET) Received: (qmail 83522 invoked by uid 500); 22 Jan 2017 22:12:09 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 83401 invoked by uid 99); 22 Jan 2017 22:12:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 22 Jan 2017 22:12:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B59E3DFF80; Sun, 22 Jan 2017 22:12:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Sun, 22 Jan 2017 22:12:12 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/9] flink git commit: [FLINK-5590] [runtime] Add proper internal state hierarchy archived-at: Sun, 22 Jan 2017 22:12:16 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 628d663..9b8af58 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -19,10 +19,10 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.commons.math3.util.ArithmeticUtils; + import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.AppendingState; -import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MergingState; import org.apache.flink.api.common.state.State; @@ -42,6 +42,9 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalAppendingState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMergingState; import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; @@ -107,22 +110,16 @@ public class WindowOperator protected final WindowAssigner windowAssigner; - protected final KeySelector keySelector; - - protected final Trigger trigger; + private final KeySelector keySelector; - protected final StateDescriptor, ?> windowStateDescriptor; + private final Trigger trigger; - protected final ListStateDescriptor> mergingWindowsDescriptor; + private final StateDescriptor, ?> windowStateDescriptor; - /** - * For serializing the key in checkpoints. - */ + /** For serializing the key in checkpoints. */ protected final TypeSerializer keySerializer; - /** - * For serializing the window in checkpoints. - */ + /** For serializing the window in checkpoints. */ protected final TypeSerializer windowSerializer; /** @@ -133,15 +130,23 @@ public class WindowOperator * {@code window.maxTimestamp + allowedLateness} landmark. * */ - protected final long allowedLateness; + private final long allowedLateness; // ------------------------------------------------------------------------ // State that is not checkpointed // ------------------------------------------------------------------------ - /** - * This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. - */ + /** The state in which the window contents is stored. Each window is a namespace */ + private transient InternalAppendingState windowState; + + /** The {@link #windowState}, typed to merging state for merging windows. + * Null if the window state is not mergeable */ + private transient InternalMergingState windowMergingState; + + /** The state that holds the merging window metadata (the sets that describe what is merged) */ + private transient InternalListState> mergingSetsState; + + /** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */ protected transient TimestampedCollector timestampedCollector; protected transient Context context = new Context(null, null); @@ -234,14 +239,6 @@ public class WindowOperator this.allowedLateness = allowedLateness; this.legacyWindowOperatorType = legacyWindowOperatorType; - if (windowAssigner instanceof MergingWindowAssigner) { - @SuppressWarnings({"unchecked", "rawtypes"}) - TupleSerializer> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} ); - mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer); - } else { - mergingWindowsDescriptor = null; - } - setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -263,6 +260,43 @@ public class WindowOperator } }; + // create (or restore) the state that hold the actual window contents + // NOTE - the state may be null in the case of the overriding evicting window operator + if (windowStateDescriptor != null) { + windowState = (InternalAppendingState) getOrCreateKeyedState(windowSerializer, windowStateDescriptor); + } + + // create the typed and helper states for merging windows + if (windowAssigner instanceof MergingWindowAssigner) { + + // store a typed reference for the state of merging windows - sanity check + if (windowState instanceof InternalMergingState) { + windowMergingState = (InternalMergingState) windowState; + } + // TODO this sanity check should be here, but is prevented by an incorrect test (pending validation) + // TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows() + // TODO activate the sanity check once resolved +// else if (windowState != null) { +// throw new IllegalStateException( +// "The window uses a merging assigner, but the window state is not mergeable."); +// } + + @SuppressWarnings("unchecked") + final Class> typedTuple = (Class>) (Class) Tuple2.class; + + final TupleSerializer> tupleSerializer = new TupleSerializer<>( + typedTuple, + new TypeSerializer[] {windowSerializer, windowSerializer} ); + + final ListStateDescriptor> mergingSetsStateDescriptor = + new ListStateDescriptor<>("merging-window-set", tupleSerializer); + + // get the state that stores the merging sets + mergingSetsState = (InternalListState>) + getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor); + mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE); + } + registerRestoredLegacyStateState(); } @@ -283,12 +317,11 @@ public class WindowOperator } @Override - @SuppressWarnings("unchecked") public void processElement(StreamRecord element) throws Exception { - Collection elementWindows = windowAssigner.assignWindows( + final Collection elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); - - final K key = (K) getKeyedStateBackend().getCurrentKey(); + + final K key = this.getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet mergingWindows = getMergingWindowSet(); @@ -315,11 +348,7 @@ public class WindowOperator } // merge the merged state windows into the newly resulting state window - getKeyedStateBackend().mergePartitionedStates( - stateWindowResult, - mergedStateWindows, - windowSerializer, - (StateDescriptor, ?>) windowStateDescriptor); + windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); @@ -334,8 +363,7 @@ public class WindowOperator throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } - AppendingState windowState = - getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); context.key = key; @@ -368,8 +396,7 @@ public class WindowOperator continue; } - AppendingState windowState = - getPartitionedState(window, windowSerializer, windowStateDescriptor); + windowState.setCurrentNamespace(window); windowState.add(element.getValue()); context.key = key; @@ -399,8 +426,7 @@ public class WindowOperator context.key = timer.getKey(); context.window = timer.getNamespace(); - AppendingState windowState; - MergingWindowSet mergingWindows = null; + MergingWindowSet mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); @@ -411,12 +437,11 @@ public class WindowOperator // so it is safe to just ignore return; } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + + windowState.setCurrentNamespace(stateWindow); } else { - windowState = getPartitionedState( - context.window, - windowSerializer, - windowStateDescriptor); + windowState.setCurrentNamespace(context.window); + mergingWindows = null; } ACC contents = windowState.get(); @@ -440,8 +465,7 @@ public class WindowOperator context.key = timer.getKey(); context.window = timer.getNamespace(); - AppendingState windowState; - MergingWindowSet mergingWindows = null; + MergingWindowSet mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); @@ -452,9 +476,10 @@ public class WindowOperator // so it is safe to just ignore return; } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + windowState.setCurrentNamespace(stateWindow); } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); + windowState.setCurrentNamespace(context.window); + mergingWindows = null; } ACC contents = windowState.get(); @@ -507,13 +532,9 @@ public class WindowOperator * {@link MergingWindowSet#persist()}. */ protected MergingWindowSet getMergingWindowSet() throws Exception { - ListState> mergeState = - getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergingWindowsDescriptor); - - @SuppressWarnings({"unchecked", "rawtypes"}) - MergingWindowSet mergingWindows = new MergingWindowSet<>((MergingWindowAssigner) windowAssigner, mergeState); - - return mergingWindows; + @SuppressWarnings("unchecked") + MergingWindowAssigner mergingAssigner = (MergingWindowAssigner) windowAssigner; + return new MergingWindowSet<>(mergingAssigner, mergingSetsState); } /** @@ -655,11 +676,19 @@ public class WindowOperator public > void mergePartitionedState(StateDescriptor stateDescriptor) { if (mergedWindows != null && mergedWindows.size() > 0) { try { - WindowOperator.this.getKeyedStateBackend().mergePartitionedStates(window, - mergedWindows, - windowSerializer, - stateDescriptor); - } catch (Exception e) { + S rawState = getKeyedStateBackend().getOrCreateKeyedState(windowSerializer, stateDescriptor); + + if (rawState instanceof InternalMergingState) { + @SuppressWarnings("unchecked") + InternalMergingState mergingState = (InternalMergingState) rawState; + mergingState.mergeNamespaces(window, mergedWindows); + } + else { + throw new IllegalArgumentException( + "The given state descriptor does not refer to a mergeable state (MergingState)"); + } + } + catch (Exception e) { throw new RuntimeException("Error while merging state.", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 0e2d1e8..2faa506 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -72,6 +72,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -83,6 +84,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@SuppressWarnings("serial") public class WindowOperatorTest extends TestLogger { // For counting if close() is called the correct number of times on the SumReducer @@ -758,7 +760,7 @@ public class WindowOperatorTest extends TestLogger { 0); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); @@ -846,7 +848,7 @@ public class WindowOperatorTest extends TestLogger { 0); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); @@ -1124,7 +1126,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1184,7 +1186,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1250,7 +1252,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1310,7 +1312,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1385,7 +1387,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple3> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1475,7 +1477,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple3> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1559,7 +1561,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple3> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1643,7 +1645,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple3> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1736,7 +1738,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple3> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1821,7 +1823,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple3> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1902,11 +1904,11 @@ public class WindowOperatorTest extends TestLogger { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowStateDesc, new InternalIterableWindowFunction<>(new PassThroughFunction2()), - new EventTimeTriggerAccumGC(LATENESS), + new EventTimeTriggerAccumGC(LATENESS), LATENESS); OneInputStreamOperatorTestHarness, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1929,7 +1931,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.close(); } - private class PassThroughFunction2 implements WindowFunction, String, String, TimeWindow> { + private static class PassThroughFunction2 implements WindowFunction, String, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -1960,7 +1962,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2006,7 +2008,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2063,7 +2065,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2108,7 +2110,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2152,7 +2154,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple3> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2172,6 +2174,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.close(); } + // TODO this test seems invalid, as it uses the unsupported combination of merging windows and folding window state @Test public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception { final int GAP_SIZE = 3; @@ -2206,7 +2209,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness, Tuple2> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2230,7 +2233,7 @@ public class WindowOperatorTest extends TestLogger { // UDFs // ------------------------------------------------------------------------ - private class PassThroughFunction implements WindowFunction, Tuple2, String, TimeWindow> { + private static class PassThroughFunction implements WindowFunction, Tuple2, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -2289,7 +2292,7 @@ public class WindowOperatorTest extends TestLogger { } @SuppressWarnings("unchecked") - private static class Tuple2ResultSortComparator implements Comparator { + private static class Tuple2ResultSortComparator implements Comparator, Serializable { @Override public int compare(Object o1, Object o2) { if (o1 instanceof Watermark || o2 instanceof Watermark) { @@ -2311,7 +2314,7 @@ public class WindowOperatorTest extends TestLogger { } @SuppressWarnings("unchecked") - private static class Tuple3ResultSortComparator implements Comparator { + private static class Tuple3ResultSortComparator implements Comparator, Serializable { @Override public int compare(Object o1, Object o2) { if (o1 instanceof Watermark || o2 instanceof Watermark) { @@ -2403,15 +2406,11 @@ public class WindowOperatorTest extends TestLogger { * purge the state of the fired window. This is to test the state * garbage collection mechanism. */ - public class EventTimeTriggerAccumGC extends Trigger { + public static class EventTimeTriggerAccumGC extends Trigger { private static final long serialVersionUID = 1L; private long cleanupTime; - private EventTimeTriggerAccumGC() { - cleanupTime = 0L; - } - public EventTimeTriggerAccumGC(long cleanupTime) { this.cleanupTime = cleanupTime; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java index 0e1aca0..0562443 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java @@ -18,9 +18,7 @@ package org.apache.flink.test.query; - import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -30,10 +28,14 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; + import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; @@ -59,14 +61,18 @@ public final class KVStateRequestSerializerRocksDBTest { */ final static class RocksDBKeyedStateBackend2 extends RocksDBKeyedStateBackend { - RocksDBKeyedStateBackend2(final JobID jobId, - final String operatorIdentifier, - final ClassLoader userCodeClassLoader, - final File instanceBasePath, final DBOptions dbOptions, - final ColumnFamilyOptions columnFamilyOptions, - final TaskKvStateRegistry kvStateRegistry, - final TypeSerializer keySerializer, final int numberOfKeyGroups, - final KeyGroupRange keyGroupRange) throws Exception { + RocksDBKeyedStateBackend2( + final JobID jobId, + final String operatorIdentifier, + final ClassLoader userCodeClassLoader, + final File instanceBasePath, + final DBOptions dbOptions, + final ColumnFamilyOptions columnFamilyOptions, + final TaskKvStateRegistry kvStateRegistry, + final TypeSerializer keySerializer, + final int numberOfKeyGroups, + final KeyGroupRange keyGroupRange) throws Exception { + super(jobId, operatorIdentifier, userCodeClassLoader, instanceBasePath, dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, @@ -74,9 +80,10 @@ public final class KVStateRequestSerializerRocksDBTest { } @Override - public ListState createListState( + public InternalListState createListState( final TypeSerializer namespaceSerializer, final ListStateDescriptor stateDesc) throws Exception { + return super.createListState(namespaceSerializer, stateDesc); } } @@ -90,8 +97,7 @@ public final class KVStateRequestSerializerRocksDBTest { */ @Test public void testListSerialization() throws Exception { - final long key = 0l; - TypeSerializer valueSerializer = LongSerializer.INSTANCE; + final long key = 0L; // objects for RocksDB state list serialisation DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); @@ -110,9 +116,10 @@ public final class KVStateRequestSerializerRocksDBTest { ); longHeapKeyedStateBackend.setCurrentKey(key); - final ListState listState = longHeapKeyedStateBackend + final InternalListState listState = longHeapKeyedStateBackend .createListState(VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); + KvStateRequestSerializerTest.testListSerialization(key, listState); } }