flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [1/2] flink git commit: [FLINK-6364] [checkpoint] Additional minor review changes
Date Fri, 05 May 2017 14:30:27 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5795ebe18 -> 8ba5c7a37


[FLINK-6364] [checkpoint] Additional minor review changes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ba5c7a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ba5c7a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ba5c7a3

Branch: refs/heads/master
Commit: 8ba5c7a37ff56cc9b60277ef13827614a1b3a10a
Parents: 6e94cf1
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Fri May 5 13:09:46 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Fri May 5 16:30:06 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 18 ++++----
 .../state/RocksDBStateBackendTest.java          |  1 -
 .../state/AbstractKeyedStateBackend.java        | 12 +----
 .../state/heap/HeapKeyedStateBackend.java       |  2 +-
 .../runtime/state/StateBackendTestBase.java     |  6 +--
 .../util/BlockerCheckpointStreamFactory.java    | 10 ++++-
 .../api/operators/AbstractStreamOperator.java   |  2 +-
 ...tractEventTimeWindowCheckpointingITCase.java | 10 ++++-
 ...ckendEventTimeWindowCheckpointingITCase.java | 46 ++++++++++++++++++++
 9 files changed, 79 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ee5f956..b8e60cd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -705,7 +705,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		}
 	}
 
-	private static class RocksDBIncrementalSnapshotOperation {
+	private static final class RocksDBIncrementalSnapshotOperation {
 
 		private final RocksDBKeyedStateBackend<?> stateBackend;
 
@@ -717,22 +717,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 		private Map<String, StreamStateHandle> baseSstFiles;
 
-		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos
= new ArrayList<>();
+		private final List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos
= new ArrayList<>();
 
 		private FileSystem backupFileSystem;
 		private Path backupPath;
 
 		// Registry for all opened i/o streams
-		private CloseableRegistry closeableRegistry = new CloseableRegistry();
+		private final CloseableRegistry closeableRegistry = new CloseableRegistry();
 
 		// new sst files since the last completed checkpoint
-		private Map<String, StreamStateHandle> newSstFiles = new HashMap<>();
+		private final Map<String, StreamStateHandle> newSstFiles = new HashMap<>();
 
 		// old sst files which have been materialized in previous completed checkpoints
-		private Map<String, StreamStateHandle> oldSstFiles = new HashMap<>();
+		private final Map<String, StreamStateHandle> oldSstFiles = new HashMap<>();
 
 		// handles to the misc files in the current snapshot
-		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
+		private final Map<String, StreamStateHandle> miscFiles = new HashMap<>();
 
 		private StreamStateHandle metaStateHandle = null;
 
@@ -753,7 +753,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
 
 			try {
-				final byte[] buffer = new byte[1024];
+				final byte[] buffer = new byte[8 * 1024];
 
 				FileSystem backupFileSystem = backupPath.getFileSystem();
 				inputStream = backupFileSystem.open(filePath);
@@ -966,7 +966,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	}
 
 	@Override
-	public void notifyOfCompletedCheckpoint(long completedCheckpointId) {
+	public void notifyCheckpointComplete(long completedCheckpointId) {
 		synchronized (asyncSnapshotLock) {
 			if (completedCheckpointId < lastCompletedCheckpointId) {
 				return;
@@ -1237,7 +1237,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
 				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
 
-				byte[] buffer = new byte[1024];
+				byte[] buffer = new byte[8 * 1024];
 				while (true) {
 					int numBytes = inputStream.read(buffer);
 					if (numBytes == -1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index fad1559..99b71c5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -45,7 +45,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 61f397c..4f3ed01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -61,7 +61,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <K> Type of the key by which state is keyed.
  */
 public abstract class AbstractKeyedStateBackend<K>
-		implements KeyedStateBackend<K>, Snapshotable<KeyedStateHandle>, Closeable
{
+		implements KeyedStateBackend<K>, Snapshotable<KeyedStateHandle>, Closeable,
CheckpointListener {
 
 	/** {@link TypeSerializer} for our key. */
 	protected final TypeSerializer<K> keySerializer;
@@ -212,16 +212,6 @@ public abstract class AbstractKeyedStateBackend<K>
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception;
 
 	/**
-	 * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.
-	 *
-	 * @param checkpointId The ID of the checkpoint that has been completed.
-	 *
-	 * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will
cause
-	 *                   the program to fail and enter recovery.
-	 */
-	public abstract void notifyOfCompletedCheckpoint(long checkpointId) throws Exception;
-
-	/**
 	 * @see KeyedStateBackend
 	 */
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index ead89b3..aecc72e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -430,7 +430,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	}
 
 	@Override
-	public void notifyOfCompletedCheckpoint(long checkpointId) {
+	public void notifyCheckpointComplete(long checkpointId) {
 		//Nothing to do
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 60f9c81..7152bfc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -43,8 +43,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -69,6 +69,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -98,7 +99,6 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import org.junit.rules.ExpectedException;
 
 
 /**
@@ -2235,7 +2235,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 		BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024
* 1024);
 		streamFactory.setWaiterLatch(waiter);
 		streamFactory.setBlockerLatch(blocker);
-		streamFactory.setAfterNumberInvocations(100);
+		streamFactory.setAfterNumberInvocations(10);
 
 		AbstractKeyedStateBackend<Integer> backend = null;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 291f3ed..1e31490 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -99,6 +99,14 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory
{
 				}
 			}
 
+			//We override this to ensure that writes go through the blocking #write(int) method!
+			@Override
+			public void write(byte[] b, int off, int len) throws IOException {
+				for (int i = 0; i < len; i++) {
+					write(b[off + i]);
+				}
+			}
+
 			@Override
 			public void close() {
 				super.close();
@@ -115,4 +123,4 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory
{
 	public void close() throws Exception {
 
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index d45ad42..8c1caee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -506,7 +506,7 @@ public abstract class AbstractStreamOperator<OUT>
 	@Override
 	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
 		if (keyedStateBackend != null) {
-			keyedStateBackend.notifyOfCompletedCheckpoint(checkpointId);
+			keyedStateBackend.notifyCheckpointComplete(checkpointId);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index d91c57f..dbef01f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -94,7 +94,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 	}
 
 	enum StateBackendEnum {
-		MEM, FILE, ROCKSDB_FULLY_ASYNC, MEM_ASYNC, FILE_ASYNC
+		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, FILE_ASYNC
 	}
 
 	@BeforeClass
@@ -143,6 +143,14 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends
TestLog
 				this.stateBackend = rdb;
 				break;
 			}
+			case ROCKSDB_INCREMENTAL: {
+				String rocksDb = tempFolder.newFolder().getAbsolutePath();
+				RocksDBStateBackend rdb =
+					new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
+				rdb.setDbStoragePath(rocksDb);
+				this.stateBackend = rdb;
+				break;
+			}
 
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ba5c7a3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..352f9f7
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase
{
+
+	public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
+		super(StateBackendEnum.ROCKSDB_INCREMENTAL);
+	}
+
+	@Override
+	protected int numElementsPerKey() {
+		return 3000;
+	}
+
+	@Override
+	protected int windowSize() {
+		return 1000;
+	}
+
+	@Override
+	protected int windowSlide() {
+		return 100;
+	}
+
+	@Override
+	protected int numKeys() {
+		return 100;
+	}
+}


Mime
View raw message