Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0F6D0200C41 for ; Fri, 24 Mar 2017 18:51:47 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0E0AE160B96; Fri, 24 Mar 2017 17:51:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CB23F160B75 for ; Fri, 24 Mar 2017 18:51:44 +0100 (CET) Received: (qmail 79378 invoked by uid 500); 24 Mar 2017 17:51:44 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 79367 invoked by uid 99); 24 Mar 2017 17:51:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Mar 2017 17:51:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BC975DFC15; Fri, 24 Mar 2017 17:51:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Fri, 24 Mar 2017 17:51:43 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] flink git commit: [FLINK-5715] Asynchronous snapshots for heap-based keyed state backend (backport from 1.3) archived-at: Fri, 24 Mar 2017 17:51:47 -0000 Repository: flink Updated Branches: refs/heads/release-1.2 b703a24d4 -> c6a807250 http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java new file mode 100644 index 0000000..255bd46 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncFileStateBackendTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.async.AsyncFsStateBackend; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Random; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class AsyncFileStateBackendTest extends StateBackendTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Override + protected AsyncFsStateBackend getStateBackend() throws Exception { + File checkpointPath = tempFolder.newFolder(); + return new AsyncFsStateBackend(localFileUri(checkpointPath)); + } + + // disable these because the verification does not work for this state backend + @Override + @Test + public void testValueStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testListStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testReducingStateRestoreWithWrongSerializers() {} + + @Test + public void testStateOutputStream() throws IOException { + File basePath = tempFolder.newFolder().getAbsoluteFile(); + + try { + // the state backend has a very low in-mem state threshold (15 bytes) + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(basePath.toURI(), 15)); + JobID jobId = new JobID(); + + // we know how FsCheckpointStreamFactory is implemented so we know where it + // will store checkpoints + File checkpointPath = new File(basePath.getAbsolutePath(), jobId.toString()); + + CheckpointStreamFactory streamFactory = backend.createStreamFactory(jobId, "test_op"); + + byte[] state1 = new byte[1274673]; + byte[] state2 = new byte[1]; + byte[] state3 = new byte[0]; + byte[] state4 = new byte[177]; + + Random rnd = new Random(); + rnd.nextBytes(state1); + rnd.nextBytes(state2); + rnd.nextBytes(state3); + rnd.nextBytes(state4); + + long checkpointId = 97231523452L; + + CheckpointStreamFactory.CheckpointStateOutputStream stream1 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + + CheckpointStreamFactory.CheckpointStateOutputStream stream2 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + + CheckpointStreamFactory.CheckpointStateOutputStream stream3 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + + stream1.write(state1); + stream2.write(state2); + stream3.write(state3); + + FileStateHandle handle1 = (FileStateHandle) stream1.closeAndGetHandle(); + ByteStreamStateHandle handle2 = (ByteStreamStateHandle) stream2.closeAndGetHandle(); + ByteStreamStateHandle handle3 = (ByteStreamStateHandle) stream3.closeAndGetHandle(); + + // use with try-with-resources + StreamStateHandle handle4; + try (CheckpointStreamFactory.CheckpointStateOutputStream stream4 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) { + stream4.write(state4); + handle4 = stream4.closeAndGetHandle(); + } + + // close before accessing handle + CheckpointStreamFactory.CheckpointStateOutputStream stream5 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + stream5.write(state4); + stream5.close(); + try { + stream5.closeAndGetHandle(); + fail(); + } catch (IOException e) { + // uh-huh + } + + validateBytesInStream(handle1.openInputStream(), state1); + handle1.discardState(); + assertFalse(isDirectoryEmpty(basePath)); + ensureLocalFileDeleted(handle1.getFilePath()); + + validateBytesInStream(handle2.openInputStream(), state2); + handle2.discardState(); + + // nothing was written to the stream, so it will return nothing + assertNull(handle3); + + validateBytesInStream(handle4.openInputStream(), state4); + handle4.discardState(); + assertTrue(isDirectoryEmpty(checkpointPath)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static void ensureLocalFileDeleted(Path path) { + URI uri = path.toUri(); + if ("file".equals(uri.getScheme())) { + File file = new File(uri.getPath()); + assertFalse("file not properly deleted", file.exists()); + } + else { + throw new IllegalArgumentException("not a local path"); + } + } + + private static void deleteDirectorySilently(File dir) { + try { + FileUtils.deleteDirectory(dir); + } + catch (IOException ignored) {} + } + + private static boolean isDirectoryEmpty(File directory) { + if (!directory.exists()) { + return true; + } + String[] nested = directory.list(); + return nested == null || nested.length == 0; + } + + private static String localFileUri(File path) { + return path.toURI().toString(); + } + + private static void validateBytesInStream(InputStream is, byte[] data) throws IOException { + try { + byte[] holder = new byte[data.length]; + + int pos = 0; + int read; + while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) { + pos += read; + } + + assertEquals("not enough data", holder.length, pos); + assertEquals("too much data", -1, is.read()); + assertArrayEquals("wrong data", data, holder); + } finally { + is.close(); + } + } + + @Test + public void testConcurrentMapIfQueryable() throws Exception { + //unsupported + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java new file mode 100644 index 0000000..b1a323b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AsyncMemoryStateBackendTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.async.AsyncMemoryStateBackend; +import org.junit.Test; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link MemoryStateBackend}. + */ +public class AsyncMemoryStateBackendTest extends StateBackendTestBase { + + @Override + protected AsyncMemoryStateBackend getStateBackend() throws Exception { + return new AsyncMemoryStateBackend(); + } + + // disable these because the verification does not work for this state backend + @Override + @Test + public void testValueStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testListStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testReducingStateRestoreWithWrongSerializers() {} + + @Test + @SuppressWarnings("unchecked, deprecation") + public void testNumStateEntries() throws Exception { + KeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor kvId = new ValueStateDescriptor<>("id", String.class, null); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + AsyncHeapKeyedStateBackend heapBackend = (AsyncHeapKeyedStateBackend) backend; + + assertEquals(0, heapBackend.numStateEntries()); + + ValueState state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(0); + state.update("hello"); + state.update("ciao"); + + assertEquals(1, heapBackend.numStateEntries()); + + backend.setCurrentKey(42); + state.update("foo"); + + assertEquals(2, heapBackend.numStateEntries()); + + backend.setCurrentKey(0); + state.clear(); + + assertEquals(1, heapBackend.numStateEntries()); + + backend.setCurrentKey(42); + state.clear(); + + assertEquals(0, heapBackend.numStateEntries()); + + backend.dispose(); + } + + @Test + public void testOversizedState() { + try { + MemoryStateBackend backend = new MemoryStateBackend(10); + CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op"); + + HashMap state = new HashMap<>(); + state.put("hey there", 2); + state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); + + try { + CheckpointStreamFactory.CheckpointStateOutputStream outStream = + streamFactory.createCheckpointStateOutputStream(12, 459); + + ObjectOutputStream oos = new ObjectOutputStream(outStream); + oos.writeObject(state); + + oos.flush(); + + outStream.closeAndGetHandle(); + + fail("this should cause an exception"); + } + catch (IOException e) { + // now darling, isn't that exactly what we wanted? + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testStateStream() { + try { + MemoryStateBackend backend = new MemoryStateBackend(); + CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op"); + + HashMap state = new HashMap<>(); + state.put("hey there", 2); + state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); + + CheckpointStreamFactory.CheckpointStateOutputStream os = streamFactory.createCheckpointStateOutputStream(1, 2); + ObjectOutputStream oos = new ObjectOutputStream(os); + oos.writeObject(state); + oos.flush(); + StreamStateHandle handle = os.closeAndGetHandle(); + + assertNotNull(handle); + + try (ObjectInputStream ois = new ObjectInputStream(handle.openInputStream())) { + assertEquals(state, ois.readObject()); + assertTrue(ois.available() <= 0); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testOversizedStateStream() { + try { + MemoryStateBackend backend = new MemoryStateBackend(10); + CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op"); + + HashMap state = new HashMap<>(); + state.put("hey there", 2); + state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); + + CheckpointStreamFactory.CheckpointStateOutputStream os = streamFactory.createCheckpointStateOutputStream(1, 2); + ObjectOutputStream oos = new ObjectOutputStream(os); + + try { + oos.writeObject(state); + oos.flush(); + os.closeAndGetHandle(); + fail("this should cause an exception"); + } + catch (IOException e) { + // oh boy! what an exception! + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testConcurrentMapIfQueryable() throws Exception { + //unsupported + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index c267afc..b196e71 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -61,7 +61,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase backend = createKeyedBackend(IntSerializer.INSTANCE); http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index e821bcf..61de1e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -39,6 +39,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -48,9 +49,13 @@ import org.apache.flink.runtime.query.KvStateRegistryListener; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.heap.AbstractHeapState; import org.apache.flink.runtime.state.heap.StateTable; +import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.types.IntValue; import org.apache.flink.util.FutureUtil; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import java.io.IOException; @@ -60,6 +65,7 @@ import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -1432,6 +1438,150 @@ public abstract class StateBackendTestBase exten } } + @Test + public void testAsyncSnapshot() throws Exception { + OneShotLatch waiter = new OneShotLatch(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + streamFactory.setWaiterLatch(waiter); + + AbstractKeyedStateBackend backend = null; + KeyGroupsStateHandle stateHandle = null; + + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + + if (!(backend instanceof AsyncHeapKeyedStateBackend)) { + return; + } + + ValueState valueState = backend.createValueState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + ((KvState)valueState).setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 10; ++i) { + backend.setCurrentKey(i); + valueState.update(i); + } + + RunnableFuture snapshot = + backend.snapshot(0L, 0L, streamFactory); + Thread runner = new Thread(snapshot); + runner.start(); + for (int i = 0; i < 20; ++i) { + backend.setCurrentKey(i); + valueState.update(i + 1); + if (10 == i) { + waiter.await(); + } + } + + runner.join(); + stateHandle = snapshot.get(); + + // test isolation + for (int i = 0; i < 20; ++i) { + backend.setCurrentKey(i); + Assert.assertEquals(i + 1, (int) valueState.value()); + } + + } finally { + if (null != backend) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + + Assert.assertNotNull(stateHandle); + + backend = createKeyedBackend(IntSerializer.INSTANCE); + try { + backend.restore(Collections.singleton(stateHandle)); + ValueState valueState = backend.createValueState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + ((KvState)valueState).setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 10; ++i) { + backend.setCurrentKey(i); + Assert.assertEquals(i, (int) valueState.value()); + } + + backend.setCurrentKey(11); + Assert.assertEquals(null, valueState.value()); + } finally { + if (null != backend) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + + @Test + public void testAsyncSnapshotCancellation() throws Exception { + OneShotLatch blocker = new OneShotLatch(); + OneShotLatch waiter = new OneShotLatch(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + streamFactory.setWaiterLatch(waiter); + streamFactory.setBlockerLatch(blocker); + streamFactory.setAfterNumberInvocations(100); + + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + + if (!(backend instanceof AsyncHeapKeyedStateBackend)) { + return; + } + + ValueState valueState = backend.createValueState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + ((KvState)valueState).setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 10; ++i) { + backend.setCurrentKey(i); + valueState.update(i); + } + + RunnableFuture snapshot = + backend.snapshot(0L, 0L, streamFactory); + + Thread runner = new Thread(snapshot); + runner.start(); + + // wait until the code reached some stream read + waiter.await(); + + // close the backend to see if the close is propagated to the stream + backend.close(); + + //unblock the stream so that it can run into the IOException + blocker.trigger(); + + //dispose the backend + backend.dispose(); + + runner.join(); + + try { + snapshot.get(); + fail("Close was not propagated."); + } catch (ExecutionException ex) { + //ignore + } + + } finally { + if (null != backend) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + private static class AppendingReduce implements ReduceFunction { @Override public String reduce(String value1, String value2) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java new file mode 100644 index 0000000..fb36d67 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableTest.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.state.ArrayListSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class CopyOnWriteStateTableTest { + + /** + * Testing the basic map operations. + */ + @Test + public void testPutGetRemoveContainsTransform() throws Exception { + RegisteredBackendStateMetaInfo> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects. + + final MockInternalKeyContext keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE); + + final CopyOnWriteStateTable> stateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + ArrayList state_1_1 = new ArrayList<>(); + state_1_1.add(41); + ArrayList state_2_1 = new ArrayList<>(); + state_2_1.add(42); + ArrayList state_1_2 = new ArrayList<>(); + state_1_2.add(43); + + Assert.assertNull(stateTable.putAndGetOld(1, 1, state_1_1)); + Assert.assertEquals(state_1_1, stateTable.get(1, 1)); + Assert.assertEquals(1, stateTable.size()); + + Assert.assertNull(stateTable.putAndGetOld(2, 1, state_2_1)); + Assert.assertEquals(state_2_1, stateTable.get(2, 1)); + Assert.assertEquals(2, stateTable.size()); + + Assert.assertNull(stateTable.putAndGetOld(1, 2, state_1_2)); + Assert.assertEquals(state_1_2, stateTable.get(1, 2)); + Assert.assertEquals(3, stateTable.size()); + + Assert.assertTrue(stateTable.containsKey(2, 1)); + Assert.assertFalse(stateTable.containsKey(3, 1)); + Assert.assertFalse(stateTable.containsKey(2, 3)); + stateTable.put(2, 1, null); + Assert.assertTrue(stateTable.containsKey(2, 1)); + Assert.assertEquals(3, stateTable.size()); + Assert.assertNull(stateTable.get(2, 1)); + stateTable.put(2, 1, state_2_1); + Assert.assertEquals(3, stateTable.size()); + + Assert.assertEquals(state_2_1, stateTable.removeAndGetOld(2, 1)); + Assert.assertFalse(stateTable.containsKey(2, 1)); + Assert.assertEquals(2, stateTable.size()); + + stateTable.remove(1, 2); + Assert.assertFalse(stateTable.containsKey(1, 2)); + Assert.assertEquals(1, stateTable.size()); + + Assert.assertNull(stateTable.removeAndGetOld(4, 2)); + Assert.assertEquals(1, stateTable.size()); + + StateTransformationFunction, Integer> function = + new StateTransformationFunction, Integer>() { + @Override + public ArrayList apply(ArrayList previousState, Integer value) throws Exception { + previousState.add(value); + return previousState; + } + }; + + final int value = 4711; + stateTable.transform(1, 1, value, function); + state_1_1 = function.apply(state_1_1, value); + Assert.assertEquals(state_1_1, stateTable.get(1, 1)); + } + + /** + * This test triggers incremental rehash and tests for corruptions. + */ + @Test + public void testIncrementalRehash() { + RegisteredBackendStateMetaInfo> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects. + + final MockInternalKeyContext keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE); + + final CopyOnWriteStateTable> stateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + int insert = 0; + int remove = 0; + while (!stateTable.isRehashing()) { + stateTable.put(insert++, 0, new ArrayList()); + if (insert % 8 == 0) { + stateTable.remove(remove++, 0); + } + } + Assert.assertEquals(insert - remove, stateTable.size()); + while (stateTable.isRehashing()) { + stateTable.put(insert++, 0, new ArrayList()); + if (insert % 8 == 0) { + stateTable.remove(remove++, 0); + } + } + Assert.assertEquals(insert - remove, stateTable.size()); + + for (int i = 0; i < insert; ++i) { + if (i < remove) { + Assert.assertFalse(stateTable.containsKey(i, 0)); + } else { + Assert.assertTrue(stateTable.containsKey(i, 0)); + } + } + } + + /** + * This test does some random modifications to a state table and a reference (hash map). Then draws snapshots, + * performs more modifications and checks snapshot integrity. + */ + @Test + public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception { + + final RegisteredBackendStateMetaInfo> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects. + + final MockInternalKeyContext keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE); + + final CopyOnWriteStateTable> stateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + final HashMap, ArrayList> referenceMap = new HashMap<>(); + + final Random random = new Random(42); + + // holds snapshots from the map under test + CopyOnWriteStateTable.StateTableEntry>[] snapshot = null; + int snapshotSize = 0; + + // holds a reference snapshot from our reference map that we compare against + Tuple3>[] reference = null; + + int val = 0; + + + int snapshotCounter = 0; + int referencedSnapshotId = 0; + + final StateTransformationFunction, Integer> transformationFunction = + new StateTransformationFunction, Integer>() { + @Override + public ArrayList apply(ArrayList previousState, Integer value) throws Exception { + if (previousState == null) { + previousState = new ArrayList<>(); + } + previousState.add(value); + // we give back the original, attempting to spot errors in to copy-on-write + return previousState; + } + }; + + // the main loop for modifications + for (int i = 0; i < 10_000_000; ++i) { + + int key = random.nextInt(20); + int namespace = random.nextInt(4); + Tuple2 compositeKey = new Tuple2<>(key, namespace); + + int op = random.nextInt(7); + + ArrayList state = null; + ArrayList referenceState = null; + + switch (op) { + case 0: + case 1: { + state = stateTable.get(key, namespace); + referenceState = referenceMap.get(compositeKey); + if (null == state) { + state = new ArrayList<>(); + stateTable.put(key, namespace, state); + referenceState = new ArrayList<>(); + referenceMap.put(compositeKey, referenceState); + } + break; + } + case 2: { + stateTable.put(key, namespace, new ArrayList()); + referenceMap.put(compositeKey, new ArrayList()); + break; + } + case 3: { + state = stateTable.putAndGetOld(key, namespace, new ArrayList()); + referenceState = referenceMap.put(compositeKey, new ArrayList()); + break; + } + case 4: { + stateTable.remove(key, namespace); + referenceMap.remove(compositeKey); + break; + } + case 5: { + state = stateTable.removeAndGetOld(key, namespace); + referenceState = referenceMap.remove(compositeKey); + break; + } + case 6: { + final int updateValue = random.nextInt(1000); + stateTable.transform(key, namespace, updateValue, transformationFunction); + referenceMap.put(compositeKey, transformationFunction.apply( + referenceMap.remove(compositeKey), updateValue)); + break; + } + default: { + Assert.fail("Unknown op-code " + op); + } + } + + Assert.assertEquals(referenceMap.size(), stateTable.size()); + + if (state != null) { + // mutate the states a bit... + if (random.nextBoolean() && !state.isEmpty()) { + state.remove(state.size() - 1); + referenceState.remove(referenceState.size() - 1); + } else { + state.add(val); + referenceState.add(val); + ++val; + } + } + + Assert.assertEquals(referenceState, state); + + // snapshot triggering / comparison / release + if (i > 0 && i % 500 == 0) { + + if (snapshot != null) { + // check our referenced snapshot + deepCheck(reference, convert(snapshot, snapshotSize)); + + if (i % 1_000 == 0) { + // draw and release some other snapshot while holding on the old snapshot + ++snapshotCounter; + stateTable.snapshotTableArrays(); + stateTable.releaseSnapshot(snapshotCounter); + } + + //release the snapshot after some time + if (i % 5_000 == 0) { + snapshot = null; + reference = null; + snapshotSize = 0; + stateTable.releaseSnapshot(referencedSnapshotId); + } + + } else { + // if there is no more referenced snapshot, we create one + ++snapshotCounter; + referencedSnapshotId = snapshotCounter; + snapshot = stateTable.snapshotTableArrays(); + snapshotSize = stateTable.size(); + reference = manualDeepDump(referenceMap); + } + } + } + } + + /** + * This tests for the copy-on-write contracts, e.g. ensures that no copy-on-write is active after all snapshots are + * released. + */ + @Test + public void testCopyOnWriteContracts() { + RegisteredBackendStateMetaInfo> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects. + + final MockInternalKeyContext keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE); + + final CopyOnWriteStateTable> stateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + ArrayList originalState1 = new ArrayList<>(1); + ArrayList originalState2 = new ArrayList<>(1); + ArrayList originalState3 = new ArrayList<>(1); + ArrayList originalState4 = new ArrayList<>(1); + ArrayList originalState5 = new ArrayList<>(1); + + originalState1.add(1); + originalState2.add(2); + originalState3.add(3); + originalState4.add(4); + originalState5.add(5); + + stateTable.put(1, 1, originalState1); + stateTable.put(2, 1, originalState2); + stateTable.put(4, 1, originalState4); + stateTable.put(5, 1, originalState5); + + // no snapshot taken, we get the original back + Assert.assertTrue(stateTable.get(1, 1) == originalState1); + CopyOnWriteStateTableSnapshot> snapshot1 = stateTable.createSnapshot(); + // after snapshot1 is taken, we get a copy... + final ArrayList copyState = stateTable.get(1, 1); + Assert.assertFalse(copyState == originalState1); + // ...and the copy is equal + Assert.assertEquals(originalState1, copyState); + + // we make an insert AFTER snapshot1 + stateTable.put(3, 1, originalState3); + + // on repeated lookups, we get the same copy because no further snapshot was taken + Assert.assertTrue(copyState == stateTable.get(1, 1)); + + // we take snapshot2 + CopyOnWriteStateTableSnapshot> snapshot2 = stateTable.createSnapshot(); + // after the second snapshot, copy-on-write is active again for old entries + Assert.assertFalse(copyState == stateTable.get(1, 1)); + // and equality still holds + Assert.assertEquals(copyState, stateTable.get(1, 1)); + + // after releasing snapshot2 + stateTable.releaseSnapshot(snapshot2); + // we still get the original of the untouched late insert (after snapshot1) + Assert.assertTrue(originalState3 == stateTable.get(3, 1)); + // but copy-on-write is still active for older inserts (before snapshot1) + Assert.assertFalse(originalState4 == stateTable.get(4, 1)); + + // after releasing snapshot1 + stateTable.releaseSnapshot(snapshot1); + // no copy-on-write is active + Assert.assertTrue(originalState5 == stateTable.get(5, 1)); + } + + @SuppressWarnings("unchecked") + private static Tuple3[] convert(CopyOnWriteStateTable.StateTableEntry[] snapshot, int mapSize) { + + Tuple3[] result = new Tuple3[mapSize]; + int pos = 0; + for (CopyOnWriteStateTable.StateTableEntry entry : snapshot) { + while (null != entry) { + result[pos++] = new Tuple3<>(entry.getKey(), entry.getNamespace(), entry.getState()); + entry = entry.next; + } + } + Assert.assertEquals(mapSize, pos); + return result; + } + + @SuppressWarnings("unchecked") + private Tuple3>[] manualDeepDump( + HashMap, + ArrayList> map) { + + Tuple3>[] result = new Tuple3[map.size()]; + int pos = 0; + for (Map.Entry, ArrayList> entry : map.entrySet()) { + Integer key = entry.getKey().f0; + Integer namespace = entry.getKey().f1; + result[pos++] = new Tuple3<>(key, namespace, new ArrayList<>(entry.getValue())); + } + return result; + } + + private void deepCheck( + Tuple3>[] a, + Tuple3>[] b) { + + if (a == b) { + return; + } + + Assert.assertEquals(a.length, b.length); + + Comparator>> comparator = + new Comparator>>() { + + @Override + public int compare(Tuple3> o1, Tuple3> o2) { + int namespaceDiff = o1.f1 - o2.f1; + return namespaceDiff != 0 ? namespaceDiff : o1.f0 - o2.f0; + } + }; + + Arrays.sort(a, comparator); + Arrays.sort(b, comparator); + + for (int i = 0; i < a.length; ++i) { + Tuple3> av = a[i]; + Tuple3> bv = b[i]; + + Assert.assertEquals(av.f0, bv.f0); + Assert.assertEquals(av.f1, bv.f1); + Assert.assertEquals(av.f2, bv.f2); + } + } + + static class MockInternalKeyContext implements InternalKeyContext { + + private T key; + private final TypeSerializer serializer; + private final KeyGroupRange keyGroupRange; + + public MockInternalKeyContext(TypeSerializer serializer) { + this.serializer = serializer; + this.keyGroupRange = new KeyGroupRange(0, 0); + } + + public void setKey(T key) { + this.key = key; + } + + @Override + public T getCurrentKey() { + return key; + } + + @Override + public int getCurrentKeyGroupIndex() { + return 0; + } + + @Override + public int getNumberOfKeyGroups() { + return 1; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + @Override + public TypeSerializer getKeySerializer() { + return serializer; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapListStateTest.java new file mode 100644 index 0000000..a7c2d15 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapListStateTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the simple Java heap objects implementation of the {@link ListState}. + */ +@SuppressWarnings("unchecked") +public class HeapListStateTest extends HeapStateBackendTestBase { + + @Test + public void testAddAndGet() throws Exception { + + final ListStateDescriptor stateDescr = new ListStateDescriptor<>("my-state", Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + final AsyncHeapKeyedStateBackend keyedBackend = createKeyedBackend(); + + try { + ListState state = + keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr); + + AbstractHeapMergingState mergingState = + (AbstractHeapMergingState) state; + + mergingState.setCurrentNamespace(VoidNamespace.INSTANCE); + + keyedBackend.setCurrentKey("abc"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("def"); + assertNull(state.get()); + state.add(17L); + state.add(11L); + assertEquals(asList(17L, 11L), state.get()); + + keyedBackend.setCurrentKey("abc"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + assertNull(state.get()); + state.add(1L); + state.add(2L); + + keyedBackend.setCurrentKey("def"); + assertEquals(asList(17L, 11L), state.get()); + state.clear(); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + state.add(3L); + state.add(2L); + state.add(1L); + + keyedBackend.setCurrentKey("def"); + assertNull(state.get()); + + keyedBackend.setCurrentKey("g"); + assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get()); + state.clear(); + + // make sure all lists / maps are cleared + + StateTable> stateTable = + ((HeapListState) state).getStateTable(); + + assertTrue(mergingState.getStateTable().isEmpty()); + } + finally { + keyedBackend.close(); + keyedBackend.dispose(); + } + } + + @Test + public void testMerging() throws Exception { + + final ListStateDescriptor stateDescr = new ListStateDescriptor<>("my-state", Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + final Integer namespace1 = 1; + final Integer namespace2 = 2; + final Integer namespace3 = 3; + + final Set expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L)); + + final AsyncHeapKeyedStateBackend keyedBackend = createKeyedBackend(); + + try { + ListState state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr); + + AbstractHeapMergingState mergingState = + (AbstractHeapMergingState) state; + + // populate the different namespaces + // - abc spreads the values over three namespaces + // - def spreads teh values over two namespaces (one empty) + // - ghi is empty + // - jkl has all elements already in the target namespace + // - mno has all elements already in one source namespace + + keyedBackend.setCurrentKey("abc"); + mergingState.setCurrentNamespace(namespace1); + state.add(33L); + state.add(55L); + + mergingState.setCurrentNamespace(namespace2); + state.add(22L); + state.add(11L); + + mergingState.setCurrentNamespace(namespace3); + state.add(44L); + + keyedBackend.setCurrentKey("def"); + mergingState.setCurrentNamespace(namespace1); + state.add(11L); + state.add(44L); + + mergingState.setCurrentNamespace(namespace3); + state.add(22L); + state.add(55L); + state.add(33L); + + keyedBackend.setCurrentKey("jkl"); + mergingState.setCurrentNamespace(namespace1); + state.add(11L); + state.add(22L); + state.add(33L); + state.add(44L); + state.add(55L); + + keyedBackend.setCurrentKey("mno"); + mergingState.setCurrentNamespace(namespace3); + state.add(11L); + state.add(22L); + state.add(33L); + state.add(44L); + state.add(55L); + + keyedBackend.setCurrentKey("abc"); + //TODO + mergingState.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + mergingState.setCurrentNamespace(namespace1); + validateResult(state.get(), expectedResult); + + keyedBackend.setCurrentKey("def"); + mergingState.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + mergingState.setCurrentNamespace(namespace1); + validateResult(state.get(), expectedResult); + + keyedBackend.setCurrentKey("ghi"); + mergingState.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + mergingState.setCurrentNamespace(namespace1); + assertNull(state.get()); + + keyedBackend.setCurrentKey("jkl"); + mergingState.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + mergingState.setCurrentNamespace(namespace1); + validateResult(state.get(), expectedResult); + + keyedBackend.setCurrentKey("mno"); + mergingState.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + mergingState.setCurrentNamespace(namespace1); + validateResult(state.get(), expectedResult); + + // make sure all lists / maps are cleared + + keyedBackend.setCurrentKey("abc"); + mergingState.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("def"); + mergingState.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("ghi"); + mergingState.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("jkl"); + mergingState.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("mno"); + mergingState.setCurrentNamespace(namespace1); + state.clear(); + + assertTrue(mergingState.getStateTable().isEmpty()); + } + finally { + keyedBackend.close(); + keyedBackend.dispose(); + } + } + + private static void validateResult(Iterable values, Set expected) { + int num = 0; + for (T v : values) { + num++; + assertTrue(expected.contains(v)); + } + + assertEquals(expected.size(), num); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapReducingStateTest.java new file mode 100644 index 0000000..5da0fef --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapReducingStateTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the simple Java heap objects implementation of the {@link ReducingState}. + */ +@SuppressWarnings("unchecked") +public class HeapReducingStateTest extends HeapStateBackendTestBase { + + @Test + public void testAddAndGet() throws Exception { + + final ReducingStateDescriptor stateDescr = + new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + final AsyncHeapKeyedStateBackend keyedBackend = createKeyedBackend(); + + try { + ReducingState reducingState = + keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr); + + AbstractHeapMergingState state = + (AbstractHeapMergingState) reducingState; + + state.setCurrentNamespace(VoidNamespace.INSTANCE); + + keyedBackend.setCurrentKey("abc"); + assertNull(reducingState.get()); + + keyedBackend.setCurrentKey("def"); + assertNull(reducingState.get()); + reducingState.add(17L); + reducingState.add(11L); + assertEquals(28L, reducingState.get().longValue()); + + keyedBackend.setCurrentKey("abc"); + assertNull(reducingState.get()); + + keyedBackend.setCurrentKey("g"); + assertNull(reducingState.get()); + reducingState.add(1L); + reducingState.add(2L); + + keyedBackend.setCurrentKey("def"); + assertEquals(28L, reducingState.get().longValue()); + state.clear(); + assertNull(reducingState.get()); + + keyedBackend.setCurrentKey("g"); + reducingState.add(3L); + reducingState.add(2L); + reducingState.add(1L); + + keyedBackend.setCurrentKey("def"); + assertNull(reducingState.get()); + + keyedBackend.setCurrentKey("g"); + assertEquals(9L, reducingState.get().longValue()); + state.clear(); + + // make sure all lists / maps are cleared + assertTrue(state.getStateTable().isEmpty()); + } + finally { + keyedBackend.close(); + keyedBackend.dispose(); + } + } + + @Test + public void testMerging() throws Exception { + + final ReducingStateDescriptor stateDescr = new ReducingStateDescriptor<>( + "my-state", new AddingFunction(), Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + final Integer namespace1 = 1; + final Integer namespace2 = 2; + final Integer namespace3 = 3; + + final Long expectedResult = 165L; + + final AsyncHeapKeyedStateBackend keyedBackend = createKeyedBackend(); + + try { + final ReducingState reducingState = + keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr); + + AbstractHeapMergingState state = + (AbstractHeapMergingState) reducingState; + + // populate the different namespaces + // - abc spreads the values over three namespaces + // - def spreads teh values over two namespaces (one empty) + // - ghi is empty + // - jkl has all elements already in the target namespace + // - mno has all elements already in one source namespace + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + reducingState.add(33L); + reducingState.add(55L); + + state.setCurrentNamespace(namespace2); + reducingState.add(22L); + reducingState.add(11L); + + state.setCurrentNamespace(namespace3); + reducingState.add(44L); + + keyedBackend.setCurrentKey("def"); + state.setCurrentNamespace(namespace1); + reducingState.add(11L); + reducingState.add(44L); + + state.setCurrentNamespace(namespace3); + reducingState.add(22L); + reducingState.add(55L); + reducingState.add(33L); + + keyedBackend.setCurrentKey("jkl"); + state.setCurrentNamespace(namespace1); + reducingState.add(11L); + reducingState.add(22L); + reducingState.add(33L); + reducingState.add(44L); + reducingState.add(55L); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace3); + reducingState.add(11L); + reducingState.add(22L); + reducingState.add(33L); + reducingState.add(44L); + reducingState.add(55L); + + keyedBackend.setCurrentKey("abc"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertEquals(expectedResult, reducingState.get()); + + keyedBackend.setCurrentKey("def"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertEquals(expectedResult, reducingState.get()); + + keyedBackend.setCurrentKey("ghi"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertNull(reducingState.get()); + + keyedBackend.setCurrentKey("jkl"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertEquals(expectedResult, reducingState.get()); + + keyedBackend.setCurrentKey("mno"); + state.mergeNamespaces(namespace1, asList(namespace2, namespace3)); + state.setCurrentNamespace(namespace1); + assertEquals(expectedResult, reducingState.get()); + + // make sure all lists / maps are cleared + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("def"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("ghi"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("jkl"); + state.setCurrentNamespace(namespace1); + state.clear(); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace1); + state.clear(); + + assertTrue(state.getStateTable().isEmpty()); + } + finally { + keyedBackend.close(); + keyedBackend.dispose(); + } + } + + // ------------------------------------------------------------------------ + // test functions + // ------------------------------------------------------------------------ + + @SuppressWarnings("serial") + private static class AddingFunction implements ReduceFunction { + + @Override + public Long reduce(Long a, Long b) { + return a + b; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapStateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapStateBackendTestBase.java new file mode 100644 index 0000000..0bb3775 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/async/HeapStateBackendTestBase.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; + +import static org.mockito.Mockito.mock; + +public abstract class HeapStateBackendTestBase { + + public AsyncHeapKeyedStateBackend createKeyedBackend() throws Exception { + return new AsyncHeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapStateBackendTestBase.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java new file mode 100644 index 0000000..291f3ed --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import java.io.IOException; + +/** + * {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO + */ +@VisibleForTesting +@Internal +public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { + + private final int maxSize; + private volatile int afterNumberInvocations; + private volatile OneShotLatch blocker; + private volatile OneShotLatch waiter; + + MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream; + + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() { + return lastCreatedStream; + } + + public BlockerCheckpointStreamFactory(int maxSize) { + this.maxSize = maxSize; + } + + public void setAfterNumberInvocations(int afterNumberInvocations) { + this.afterNumberInvocations = afterNumberInvocations; + } + + public void setBlockerLatch(OneShotLatch latch) { + this.blocker = latch; + } + + public void setWaiterLatch(OneShotLatch latch) { + this.waiter = latch; + } + + @Override + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) { + + private int afterNInvocations = afterNumberInvocations; + private final OneShotLatch streamBlocker = blocker; + private final OneShotLatch streamWaiter = waiter; + + @Override + public void write(int b) throws IOException { + + if (null != waiter) { + waiter.trigger(); + } + + if (afterNInvocations > 0) { + --afterNInvocations; + } + + if (0 == afterNInvocations && null != streamBlocker) { + try { + streamBlocker.await(); + } catch (InterruptedException ignored) { + } + } + try { + super.write(b); + } catch (IOException ex) { + if (null != streamWaiter) { + streamWaiter.trigger(); + } + throw ex; + } + + if (0 == afterNInvocations && null != streamWaiter) { + streamWaiter.trigger(); + } + } + + @Override + public void close() { + super.close(); + if (null != streamWaiter) { + streamWaiter.trigger(); + } + } + }; + + return lastCreatedStream; + } + + @Override + public void close() throws Exception { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java index 0d5d091..a1adda1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.util.MathUtils; import java.io.IOException; import java.util.ArrayList; @@ -35,7 +36,7 @@ import java.util.Set; /** * A {@link Window} that represents a time interval from {@code start} (inclusive) to - * {@code start + size} (exclusive). + * {@code end} (exclusive). */ @PublicEvolving public class TimeWindow extends Window { @@ -48,14 +49,35 @@ public class TimeWindow extends Window { this.end = end; } + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs + * to this window. + * + * @return The starting timestamp of this window. + */ public long getStart() { return start; } + /** + * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it + * is the first timestamp that does not belong to this window any more. + * + * @return The exclusive end timestamp of this window. + */ public long getEnd() { return end; } + /** + * Gets the largest timestamp that still belongs to this window. + * + *

This timestamp is identical to {@code getEnd() - 1}. + * + * @return The largest timestamp that still belongs to this window. + * + * @see #getEnd() + */ @Override public long maxTimestamp() { return end - 1; @@ -77,17 +99,15 @@ public class TimeWindow extends Window { @Override public int hashCode() { - int result = (int) (start ^ (start >>> 32)); - result = 31 * result + (int) (end ^ (end >>> 32)); - return result; + return MathUtils.longToIntWithBitMixing(start + end); } @Override public String toString() { return "TimeWindow{" + - "start=" + start + - ", end=" + end + - '}'; + "start=" + start + + ", end=" + end + + '}'; } /** @@ -104,6 +124,13 @@ public class TimeWindow extends Window { return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end)); } + // ------------------------------------------------------------------------ + // Serializer + // ------------------------------------------------------------------------ + + /** + * The serializer used to write the TimeWindow type. + */ public static class Serializer extends TypeSerializer { private static final long serialVersionUID = 1L; @@ -152,9 +179,7 @@ public class TimeWindow extends Window { @Override public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException { - long start = source.readLong(); - long end = source.readLong(); - return new TimeWindow(start, end); + return deserialize(source); } @Override @@ -179,6 +204,10 @@ public class TimeWindow extends Window { } } + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + /** * Merge overlapping {@link TimeWindow}s. For use by merging * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}. http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index ee417ac..b9028c8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -32,7 +32,9 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.async.AsyncFsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.memory.async.AsyncMemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -91,7 +93,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } enum StateBackendEnum { - MEM, FILE, ROCKSDB_FULLY_ASYNC + MEM, FILE, ROCKSDB_FULLY_ASYNC, MEM_ASYNC, FILE_ASYNC } @BeforeClass @@ -115,12 +117,18 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog @Before public void initStateBackend() throws IOException { switch (stateBackendEnum) { + case MEM_ASYNC: + this.stateBackend = new AsyncMemoryStateBackend(MAX_MEM_STATE_SIZE); + break; + case FILE_ASYNC: { + this.stateBackend = new AsyncFsStateBackend(tempFolder.newFolder().toURI()); + break; + } case MEM: this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE); break; case FILE: { - String backups = tempFolder.newFolder().getAbsolutePath(); - this.stateBackend = new FsStateBackend("file://" + backups); + this.stateBackend = new FsStateBackend(tempFolder.newFolder().toURI()); break; } case ROCKSDB_FULLY_ASYNC: { http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..a5bf10c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { + + public AsyncFileBackendEventTimeWindowCheckpointingITCase() { + super(StateBackendEnum.FILE_ASYNC); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..ef9ad37 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { + + public AsyncMemBackendEventTimeWindowCheckpointingITCase() { + super(StateBackendEnum.MEM_ASYNC); + } +}