Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 722F9200B52 for ; Mon, 25 Jul 2016 20:50:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 70C2C160A7D; Mon, 25 Jul 2016 18:50:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8EA76160A67 for ; Mon, 25 Jul 2016 20:50:41 +0200 (CEST) Received: (qmail 11470 invoked by uid 500); 25 Jul 2016 18:50:40 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 11460 invoked by uid 99); 25 Jul 2016 18:50:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jul 2016 18:50:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 321DEC6A8D for ; Mon, 25 Jul 2016 18:50:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=axiomine-com.20150623.gappssmtp.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id haF5zidI1eka for ; Mon, 25 Jul 2016 18:50:39 +0000 (UTC) Received: from mail-oi0-f49.google.com (mail-oi0-f49.google.com [209.85.218.49]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 238875F474 for ; Mon, 25 Jul 2016 18:50:39 +0000 (UTC) Received: by mail-oi0-f49.google.com with SMTP id w18so265270073oiw.3 for ; Mon, 25 Jul 2016 11:50:39 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=axiomine-com.20150623.gappssmtp.com; s=20150623; h=mime-version:from:date:message-id:subject:to; bh=wpwnta6wJ3QMRLGO6+CZWCY+cw3dIqI3PUkDITz4Lww=; b=ZalMyVA0c3JcjZg/lfwuKmrwQzR98AYA3/9LlvOHyUX+jPZaUCDh9Rt7e32LCmuIzf suENWPTc9kgsSDW/SVjjN/GxL8vEqRqsUByOIw0UK+yIWwL1Hc3S1AjoHdosQt5LAMk1 m4yygz17daFu2bm7MoKKCWg3JaoAX6nlus1ki0iTdl9xoE3JeIlfW8Blk+PYNiilxgyw +9Y91uVoP6tKQKpCSyUM68/Ldr+yweo3AFO+tBbx+cQvpRwzZiwSiJPLkSIEiUGha73C 66KrGYzQ1NOln8ibuyHtC7XVj3yyGZ8yTurOQ6mmTV6SmyxAd4Va221vbKTdngkaznW9 0lOw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=wpwnta6wJ3QMRLGO6+CZWCY+cw3dIqI3PUkDITz4Lww=; b=dTvgFfrzGpTcC56/sP+zuntItWywiRVooFVVfM7H581FwGijQV0zdWT8Zx4XawNqvh /iCjvHApqSBMGmTE1R4GOc14KKcDNa/ObH+kFQlRxN4N30lZKJmQ9a5QqGVxZ+pSwPCK L0WLONxdYQp+pNodOdsYHX9aK73DB1OfucQwqD1j9BWaDc6KH2quecpWlSZzdAz9SbD+ 02CDtWUpWcb/7ak7ryVGgQhh5+l+lsIdrjzRk/drDZl8p/pE8kbWIhVCgZJdnK+o+kEB bVjMMj2J5xkgejH990SbtoSzHlpzkEKk77lHa6APJtVL3vf6TEO5mM2ZSnCmTbAu9Ez1 TdQg== X-Gm-Message-State: AEkooutq8zWjt7Qba3pjK+mzgQ7WNsYtyCC6+dFzsoyM9POp7eBC9NvWGbAT64RMQ9uB1KCY9WpEOeQDmYANOQ== X-Received: by 10.157.49.25 with SMTP id e25mr11522780otc.38.1469472638437; Mon, 25 Jul 2016 11:50:38 -0700 (PDT) MIME-Version: 1.0 Received: by 10.157.47.163 with HTTP; Mon, 25 Jul 2016 11:50:38 -0700 (PDT) X-Originating-IP: [96.241.191.10] From: Sameer W Date: Mon, 25 Jul 2016 14:50:38 -0400 Message-ID: Subject: Question about Checkpoint Storage (RocksDB) To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113d04ce9f223005387a42a9 archived-at: Mon, 25 Jul 2016 18:50:42 -0000 --001a113d04ce9f223005387a42a9 Content-Type: text/plain; charset=UTF-8 Hi, My understanding about the RocksDB state backend is as follows: When using a RocksDB state backend, it the checkpoints are backed up locally (to the TaskManager) using the backup feature of RocksDB by taking snapshots from RocksDB which are consistent read-only views on the RockDB database. Each checkpoint is backed up on the task manager node and this checkpoint is asynchronously backed up to the remote HDFS location. When each checkpoint is committed, the records are deleted from RocksDB, allowing RocksDb data folders to remain small. This in turn allows each snapshot to be relatively small. If the Task node goes away due to failure, I assume the RocksDB database is restored from the checkpoints from the remote HDFS. Since each checkpoint state is relatively small, the restoration time from HDFS for the RocksDB database on the new task node is relatively small. The question is, if using really long windows (in hours) if the state of the window gets very large over time, would size of the RocksDB get larger? Would replication to HDFS start causing performance bottlenecks? Also would this need a constant (at checkpoint interval?), read from RocksDB, add more window elements and write to RocksDB. Outside of the read costs, is there a risk to having very long windows when you know you could collect a lot of elements in them. Instead is it safer to perform aggregations on top of aggregations or use your own custom remote store like HBase to persist larger state per record and use windows only to store the keys in HBase. I mention HBase because of its support for column qualifiers allow elements to be added to the same key in multiple ordered column qualifiers. Reading can also be throttled in batches of column qualifiers allowing for the better memory consumption. Is this approach used in practice? Thanks, Sameer --001a113d04ce9f223005387a42a9 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

My understanding about the RocksDB = state backend is as follows:

When using a RocksDB = state backend, it the checkpoints are backed up locally (to the TaskManager= ) using the backup feature of RocksDB by taking snapshots from RocksDB whic= h are consistent read-only views on the RockDB database. Each checkpoint is= backed up on the task manager node and this checkpoint is asynchronously b= acked up to the remote HDFS location.=C2=A0 When each checkpoint is committ= ed, the records are deleted from RocksDB, allowing RocksDb data folders to = remain small. This in turn allows each snapshot to be relatively small. If = the Task node goes away due to failure, I assume the RocksDB database is re= stored from the checkpoints from the remote HDFS. Since each checkpoint sta= te is relatively small, the restoration time from HDFS for the RocksDB data= base on the new task node is relatively small.

The= question is, if using really long windows (in hours) if the state of the w= indow gets very large over time, would size of the RocksDB get larger? Woul= d replication to HDFS start causing performance bottlenecks? Also would thi= s need a constant (at checkpoint interval?), read from RocksDB, add more wi= ndow elements and write to RocksDB.=C2=A0

Outside = of the read costs, is there a risk to having very long windows when you kno= w you could collect a lot of elements in them. Instead is it safer to perfo= rm aggregations on top of aggregations or use your own custom remote store = like HBase to persist larger state per record and use windows only to store= the keys in HBase. I mention HBase because of its support for column quali= fiers allow elements to be added to the same key in multiple ordered column= qualifiers. Reading can also be throttled in batches of column qualifiers = allowing for the better memory consumption. Is this approach used in practi= ce?

Thanks,
Sameer
--001a113d04ce9f223005387a42a9--