Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 08E1C200C7F for ; Wed, 24 May 2017 14:51:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 078D3160B9C; Wed, 24 May 2017 12:51:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 08711160BB4 for ; Wed, 24 May 2017 14:51:50 +0200 (CEST) Received: (qmail 43523 invoked by uid 500); 24 May 2017 12:51:50 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 43501 invoked by uid 99); 24 May 2017 12:51:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 May 2017 12:51:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D1889E024D; Wed, 24 May 2017 12:51:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Wed, 24 May 2017 12:51:50 -0000 Message-Id: In-Reply-To: <0595f63bb0754f34aa541588b7260849@git.apache.org> References: <0595f63bb0754f34aa541588b7260849@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-6690] Fix meta-data restore in RocksDBKeyedStateBackend under rescaling archived-at: Wed, 24 May 2017 12:51:52 -0000 [FLINK-6690] Fix meta-data restore in RocksDBKeyedStateBackend under rescaling Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36830ada Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36830ada Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36830ada Branch: refs/heads/master Commit: 36830adacecc94beb8968eaacabda11cd91bb2de Parents: 7639d49 Author: Stefan Richter Authored: Wed May 24 10:15:19 2017 +0200 Committer: Stefan Richter Committed: Wed May 24 14:51:27 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 51 ++++++++++-------- .../state/heap/HeapKeyedStateBackend.java | 2 +- .../test/checkpointing/RescalingITCase.java | 55 +++++++++++++------- 3 files changed, 65 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 51255ab..053c820 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -155,7 +155,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { * Information about the k/v states as we create them. This is used to retrieve the * column family that is used for a state and also for sanity checks when restoring. */ - private Map>> kvStateInformation; + private final Map>> kvStateInformation; /** * Map of state names to their corresponding restored state meta info. @@ -163,7 +163,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { * TODO this map can be removed when eager-state registration is in place. * TODO we currently need this cached to check state migration strategies when new serializers are registered. */ - private Map> restoredKvStateMetaInfos; + private final Map> restoredKvStateMetaInfos; /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; @@ -172,7 +172,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { private final boolean enableIncrementalCheckpointing; /** The state handle ids of all sst files materialized in snapshots for previous checkpoints */ - private final SortedMap> materializedSstFiles = new TreeMap<>(); + private final SortedMap> materializedSstFiles; /** The identifier of the last completed checkpoint */ private long lastCompletedCheckpointId = -1; @@ -221,8 +221,10 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { throw new IOException("Error cleaning RocksDB data directory.", e); } - keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; - kvStateInformation = new HashMap<>(); + this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; + this.kvStateInformation = new HashMap<>(); + this.restoredKvStateMetaInfos = new HashMap<>(); + this.materializedSstFiles = new TreeMap<>(); } /** @@ -249,6 +251,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { } kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); try { db.close(); @@ -826,7 +829,9 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { assert (Thread.holdsLock(stateBackend.asyncSnapshotLock)); // use the last completed checkpoint as the comparison base. - baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + synchronized (stateBackend.materializedSstFiles) { + baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + } // save meta data for (Map.Entry>> stateMetaInfoEntry @@ -885,7 +890,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { - synchronized (stateBackend.asyncSnapshotLock) { + synchronized (stateBackend.materializedSstFiles) { stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); } @@ -943,6 +948,10 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); + try { if (restoreState == null || restoreState.isEmpty()) { createDB(); @@ -964,7 +973,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { @Override public void notifyCheckpointComplete(long completedCheckpointId) { - synchronized (asyncSnapshotLock) { + synchronized (materializedSstFiles) { if (completedCheckpointId < lastCompletedCheckpointId) { return; } @@ -1125,13 +1134,15 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { List> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); - currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); - rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); + //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); for (RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfo : restoredMetaInfos) { - if (!rocksDBKeyedStateBackend.kvStateInformation.containsKey(restoredMetaInfo.getName())) { + Tuple2> registeredColumn = + rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); + + if (registeredColumn == null) { ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), rocksDBKeyedStateBackend.columnOptions); @@ -1147,14 +1158,13 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - rocksDBKeyedStateBackend.kvStateInformation.put( - stateMetaInfo.getName(), - new Tuple2>(columnFamily, stateMetaInfo)); + registeredColumn = new Tuple2>(columnFamily, stateMetaInfo); + rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); - currentStateHandleKVStateColumnFamilies.add(columnFamily); } else { // TODO with eager state registration in place, check here for serializer migration strategies } + currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); } } @@ -1313,8 +1323,6 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { List columnFamilyDescriptors = new ArrayList<>(); - stateBackend.restoredKvStateMetaInfos = new HashMap<>(stateMetaInfoSnapshots.size()); - for (RegisteredKeyedBackendStateMetaInfo.Snapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( @@ -1424,7 +1432,9 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { // use the restore sst files as the base for succeeding checkpoints - stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet()); + synchronized (stateBackend.materializedSstFiles) { + stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet()); + } stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); } @@ -1890,11 +1900,6 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { Preconditions.checkState(1 == namedStates.size(), "Only one element expected here."); DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader); - // clear k/v state information before filling it - kvStateInformation.clear(); - - restoredKvStateMetaInfos = new HashMap<>(namedStates.size()); - // first get the column family mapping int numColumns = inputView.readInt(); Map> columnFamilyMapping = new HashMap<>(numColumns); http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index ada6377..d4ba204 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -399,7 +399,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + throw new IllegalStateException("The new key serializer is not compatible to read previous keys. " + "Aborting now since state migration is currently not available"); } http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 88dd1dd..9df0d1a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -57,10 +56,12 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; @@ -81,15 +82,23 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -/** - * TODO : parameterize to test all different state backends! - */ +@RunWith(Parameterized.class) public class RescalingITCase extends TestLogger { private static final int numTaskManagers = 2; private static final int slotsPerTaskManager = 2; private static final int numSlots = numTaskManagers * slotsPerTaskManager; + @Parameterized.Parameters + public static Object[] data() { + return new Object[]{"filesystem", "rocksdb"}; + } + + @Parameterized.Parameter + public String backend; + + private String currentBackend = null; + enum OperatorCheckpointMethod { NON_PARTITIONED, CHECKPOINTED_FUNCTION, CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED } @@ -99,25 +108,32 @@ public class RescalingITCase extends TestLogger { @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - @BeforeClass - public static void setup() throws Exception { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager); + @Before + public void setup() throws Exception { + // detect parameter change + if (currentBackend != backend) { + shutDownExistingCluster(); + + currentBackend = backend; - final File checkpointDir = temporaryFolder.newFolder(); - final File savepointDir = temporaryFolder.newFolder(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager); - config.setString(CoreOptions.STATE_BACKEND, "filesystem"); - config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); - config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); + final File checkpointDir = temporaryFolder.newFolder(); + final File savepointDir = temporaryFolder.newFolder(); - cluster = new TestingCluster(config); - cluster.start(); + config.setString(CoreOptions.STATE_BACKEND, currentBackend); + config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); + + cluster = new TestingCluster(config); + cluster.start(); + } } @AfterClass - public static void teardown() { + public static void shutDownExistingCluster() { if (cluster != null) { cluster.shutdown(); cluster.awaitTermination(); @@ -867,6 +883,7 @@ public class RescalingITCase extends TestLogger { private static class StateSourceBase extends RichParallelSourceFunction { + private static final long serialVersionUID = 7512206069681177940L; private static volatile CountDownLatch workStartedLatch = new CountDownLatch(1); protected volatile int counter = 0; @@ -959,7 +976,7 @@ public class RescalingITCase extends TestLogger { private static final long serialVersionUID = -359715965103593462L; private static final int NUM_PARTITIONS = 7; - private ListState counterPartitions; + private transient ListState counterPartitions; private boolean broadcast; private static int[] CHECK_CORRECT_SNAPSHOT;