flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [6/8] flink git commit: [FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase
Date Thu, 28 Sep 2017 15:09:11 GMT
[FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/560e5f3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/560e5f3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/560e5f3b

Branch: refs/heads/master
Commit: 560e5f3b4ad9ced5580553c208d341ebc0aa5a18
Parents: c691856
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Sep 27 12:54:19 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Sep 28 17:03:54 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBReducingStateTest.java         | 239 -------------------
 .../runtime/state/StateBackendTestBase.java     | 174 ++++++++++++++
 .../state/StateSnapshotCompressionTest.java     |   9 +-
 .../state/heap/HeapReducingStateTest.java       | 236 ------------------
 .../state/heap/HeapStateBackendTestBase.java    |   2 +-
 5 files changed, 179 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
deleted file mode 100644
index 0733dce..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the {@link ReducingState} implementation on top of RocksDB.
- */
-public class RocksDBReducingStateTest {
-
-	@Rule
-	public final TemporaryFolder tmp = new TemporaryFolder();
-
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testAddAndGet() throws Exception {
-
-		final ReducingStateDescriptor<Long> stateDescr =
-				new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
-		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
-		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
-		try {
-			InternalReducingState<VoidNamespace, Long> state =
-					keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
-			state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-			state.add(17L);
-			state.add(11L);
-			assertEquals(28L, state.get().longValue());
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
-
-			keyedBackend.setCurrentKey("def");
-			assertEquals(28L, state.get().longValue());
-			state.clear();
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			state.add(3L);
-			state.add(2L);
-			state.add(1L);
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertEquals(9L, state.get().longValue());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	@Test
-	public void testMerging() throws Exception {
-
-		final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
-				"my-state", new AddingFunction(), Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final TimeWindow win1 = new TimeWindow(1000, 2000);
-		final TimeWindow win2 = new TimeWindow(2000, 3000);
-		final TimeWindow win3 = new TimeWindow(3000, 4000);
-
-		final Long expectedResult = 165L;
-
-		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
-		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
-		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
-		try {
-			final InternalReducingState<TimeWindow, Long> state =
-					keyedBackend.createReducingState(new TimeWindow.Serializer(), stateDescr);
-
-			// populate the different namespaces
-			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
-			//  - ghi is empty
-			//  - jkl has all elements already in the target namespace
-			//  - mno has all elements already in one source namespace
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(win1);
-			state.add(33L);
-			state.add(55L);
-
-			state.setCurrentNamespace(win2);
-			state.add(22L);
-			state.add(11L);
-
-			state.setCurrentNamespace(win3);
-			state.add(44L);
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(win1);
-			state.add(11L);
-			state.add(44L);
-
-			state.setCurrentNamespace(win3);
-			state.add(22L);
-			state.add(55L);
-			state.add(33L);
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(win1);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(win3);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("abc");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("def");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("ghi");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("mno");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend
backend) throws Exception {
-		RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>)
backend.createKeyedStateBackend(
-				new DummyEnvironment("TestTask", 1, 0),
-				new JobID(),
-				"test-op",
-				StringSerializer.INSTANCE,
-				16,
-				new KeyGroupRange(2, 3),
-				mock(TaskKvStateRegistry.class));
-
-		keyedBackend.restore(null);
-
-		return keyedBackend;
-	}
-
-	// ------------------------------------------------------------------------
-	//  test functions
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static class AddingFunction implements ReduceFunction<Long> {
-
-		@Override
-		public Long reduce(Long a, Long b)  {
-			return a + b;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f6b79b1..5b8e8aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
 import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.types.IntValue;
@@ -1536,6 +1537,179 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 	}
 
 	@Test
+	public void testReducingStateAddAndGet() throws Exception {
+
+		final ReducingStateDescriptor<Long> stateDescr =
+			new ReducingStateDescriptor<>("my-state", (a, b) -> a + b, Long.class);
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			ReducingState<Long> state =
+				keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE,
stateDescr);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+			state.clear();
+
+			// make sure all lists / maps are cleared
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testReducingStateMerging() throws Exception {
+
+		final ReducingStateDescriptor<Long> stateDescr =
+			new ReducingStateDescriptor<>("my-state", (a, b) -> a + b, Long.class);
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		final Long expectedResult = 165L;
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			final InternalReducingState<Integer, Long> state =
+				(InternalReducingState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE,
stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
 	@SuppressWarnings("unchecked,rawtypes")
 	public void testFoldingState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();

http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 63d2453..1aa6f63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
-import org.apache.flink.runtime.state.heap.HeapReducingStateTest;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.util.TestLogger;
@@ -49,7 +48,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			StringSerializer.INSTANCE,
-			HeapReducingStateTest.class.getClassLoader(),
+			StateSnapshotCompressionTest.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			true,
@@ -70,7 +69,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		stateBackend = new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			StringSerializer.INSTANCE,
-			HeapReducingStateTest.class.getClassLoader(),
+			StateSnapshotCompressionTest.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			true,
@@ -109,7 +108,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			StringSerializer.INSTANCE,
-			HeapReducingStateTest.class.getClassLoader(),
+			StateSnapshotCompressionTest.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			true,
@@ -150,7 +149,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		stateBackend = new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			StringSerializer.INSTANCE,
-			HeapReducingStateTest.class.getClassLoader(),
+			StateSnapshotCompressionTest.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			true,

http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
deleted file mode 100644
index 928eaec..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.junit.Test;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the simple Java heap objects implementation of the {@link ReducingState}.
- */
-public class HeapReducingStateTest extends HeapStateBackendTestBase {
-
-	@Test
-	public void testAddAndGet() throws Exception {
-
-		final ReducingStateDescriptor<Long> stateDescr =
-				new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
-		try {
-			InternalReducingState<VoidNamespace, Long> state =
-					keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
-			state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-			state.add(17L);
-			state.add(11L);
-			assertEquals(28L, state.get().longValue());
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
-
-			keyedBackend.setCurrentKey("def");
-			assertEquals(28L, state.get().longValue());
-			state.clear();
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			state.add(3L);
-			state.add(2L);
-			state.add(1L);
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertEquals(9L, state.get().longValue());
-			state.clear();
-
-			// make sure all lists / maps are cleared
-
-			StateTable<String, VoidNamespace, Long> stateTable =
-					((HeapReducingState<String, VoidNamespace, Long>) state).stateTable;
-
-			assertTrue(stateTable.isEmpty());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	@Test
-	public void testMerging() throws Exception {
-
-		final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
-				"my-state", new AddingFunction(), Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final Integer namespace1 = 1;
-		final Integer namespace2 = 2;
-		final Integer namespace3 = 3;
-
-		final Long expectedResult = 165L;
-
-		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
-		try {
-			final InternalReducingState<Integer, Long> state =
-					keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr);
-
-			// populate the different namespaces
-			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
-			//  - ghi is empty
-			//  - jkl has all elements already in the target namespace
-			//  - mno has all elements already in one source namespace
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			state.add(33L);
-			state.add(55L);
-
-			state.setCurrentNamespace(namespace2);
-			state.add(22L);
-			state.add(11L);
-
-			state.setCurrentNamespace(namespace3);
-			state.add(44L);
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			state.add(11L);
-			state.add(44L);
-
-			state.setCurrentNamespace(namespace3);
-			state.add(22L);
-			state.add(55L);
-			state.add(33L);
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace3);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("abc");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("def");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("ghi");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("mno");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			// make sure all lists / maps are cleared
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("ghi");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			StateTable<String, Integer, Long> stateTable =
-					((HeapReducingState<String, Integer, Long>) state).stateTable;
-
-			assertTrue(stateTable.isEmpty());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test functions
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static class AddingFunction implements ReduceFunction<Long> {
-
-		@Override
-		public Long reduce(Long a, Long b)  {
-			return a + b;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index 2136304..b10c2c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -50,7 +50,7 @@ public abstract class HeapStateBackendTestBase {
 		return new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			keySerializer,
-			HeapReducingStateTest.class.getClassLoader(),
+			HeapStateBackendTestBase.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			async,


Mime
View raw message