Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4D140200CA3 for ; Tue, 2 May 2017 11:41:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4BF60160BBE; Tue, 2 May 2017 09:41:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6A3C4160BA1 for ; Tue, 2 May 2017 11:41:09 +0200 (CEST) Received: (qmail 70054 invoked by uid 500); 2 May 2017 09:41:08 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 69980 invoked by uid 99); 2 May 2017 09:41:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 May 2017 09:41:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6194B1AFC8E for ; Tue, 2 May 2017 09:41:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.001 X-Spam-Level: X-Spam-Status: No, score=-100.001 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 7al0gYw1u8SQ for ; Tue, 2 May 2017 09:41:06 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id DA85B5FB2C for ; Tue, 2 May 2017 09:41:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 16ED9E0D50 for ; Tue, 2 May 2017 09:41:05 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 5D7D821DE9 for ; Tue, 2 May 2017 09:41:04 +0000 (UTC) Date: Tue, 2 May 2017 09:41:04 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 02 May 2017 09:41:10 -0000 [ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992639#comment-15992639 ] ASF GitHub Bot commented on FLINK-6364: --------------------------------------- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114282104 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + serializationProxy.write(out); + + return outputStream.closeAndGetHandle(); + } finally { + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + void takeSnapshot() throws Exception { + // use the last completed checkpoint as the comparison base. + baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + + // save meta data + for (Map.Entry>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) { + + RegisteredBackendStateMetaInfo metaInfo = stateMetaInfoEntry.getValue().f1; + + KeyedBackendSerializationProxy.StateMetaInfo metaInfoProxy = + new KeyedBackendSerializationProxy.StateMetaInfo<>( + metaInfo.getStateType(), + metaInfo.getName(), + metaInfo.getNamespaceSerializer(), + metaInfo.getStateSerializer()); + + stateMetaInfos.add(metaInfoProxy); + } + + // save state data + backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); + backupFileSystem = backupPath.getFileSystem(); + if (backupFileSystem.exists(backupPath)) { + LOG.warn("Deleting an existing local checkpoint directory " + + backupPath + "."); + + backupFileSystem.delete(backupPath, true); + } + + // create hard links of living files in the checkpoint path + Checkpoint checkpoint = Checkpoint.create(stateBackend.db); + checkpoint.createCheckpoint(backupPath.getPath()); --- End diff -- Sorry I was probably wrong here. I guess recent changes are in the CURRENT and MANIFEST files and those are always checkpointed > Implement incremental checkpointing in RocksDBStateBackend > ---------------------------------------------------------- > > Key: FLINK-6364 > URL: https://issues.apache.org/jira/browse/FLINK-6364 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > {{RocksDBStateBackend}} is well suited for incremental checkpointing because RocksDB is base on LSM trees, which record updates in new sst files and all sst files are immutable. By only materializing those new sst files, we can significantly improve the performance of checkpointing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)