Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2A107200B71 for ; Wed, 31 Aug 2016 18:47:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 28A52160AB4; Wed, 31 Aug 2016 16:47:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6EA8B160AA7 for ; Wed, 31 Aug 2016 18:47:23 +0200 (CEST) Received: (qmail 47614 invoked by uid 500); 31 Aug 2016 16:47:22 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 47603 invoked by uid 99); 31 Aug 2016 16:47:22 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Aug 2016 16:47:22 +0000 Received: from mail-it0-f54.google.com (mail-it0-f54.google.com [209.85.214.54]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 521971A015B for ; Wed, 31 Aug 2016 16:47:22 +0000 (UTC) Received: by mail-it0-f54.google.com with SMTP id i184so6793549itf.1 for ; Wed, 31 Aug 2016 09:47:22 -0700 (PDT) X-Gm-Message-State: AE9vXwMzzSY5m1aqIjnzM6O8utdtPU6N1uoDfsec6TJiBKlIoP25wdWmD5TflyeBKT8z34msEKT955769z26mA== X-Received: by 10.36.43.82 with SMTP id h79mr15517983ita.60.1472662041180; Wed, 31 Aug 2016 09:47:21 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.171.7 with HTTP; Wed, 31 Aug 2016 09:47:20 -0700 (PDT) In-Reply-To: References: From: Stephan Ewen Date: Wed, 31 Aug 2016 18:47:20 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Streaming - memory management To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=001a11475d4ed7b41d053b60d9f2 archived-at: Wed, 31 Aug 2016 16:47:24 -0000 --001a11475d4ed7b41d053b60d9f2 Content-Type: text/plain; charset=UTF-8 If you use RocksDB, you will not run into OutOfMemory errors. On Wed, Aug 31, 2016 at 6:34 PM, Fabian Hueske wrote: > Hi Vinaj, > > if you use user-defined state, you have to manually clear it. > Otherwise, it will stay in the state backend (heap or RocksDB) until the > job goes down (planned or due to an OOM error). > > This is esp. important to keep in mind, when using keyed state. > If you have an unbounded, evolving key space you will likely run > out-of-memory. > The job will constantly add state for each new key but won't be able to > clean up the state for "expired" keys. > > You could implement a clean-up mechanism this if you implement a custom > stream operator. > However this is a very low level interface and requires solid understanding > of the internals like timestamps, watermarks and the checkpointing > mechanism. > > The community is currently working on a state expiry feature (state will be > discarded if not requested or updated for x minutes). > > Regarding the second question: Does state remain local after checkpointing? > Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but > remains in the operator. So the state is not gone after a checkpoint is > completed. > > Hope this helps, > Fabian > > 2016-08-31 18:17 GMT+02:00 Vinay Patil : > > > Hi Stephan, > > > > Just wanted to jump into this discussion regarding state. > > > > So do you mean that if we maintain user-defined state (for non-window > > operators), then if we do not clear it explicitly will the data for that > > key remains in RocksDB. > > > > What happens in case of checkpoint ? I read in the documentation that > after > > the checkpoint happens the rocksDB data is pushed to the desired location > > (hdfs or s3 or other fs), so for user-defined state does the data still > > remain in RocksDB after checkpoint ? > > > > Correct me if I have misunderstood this concept > > > > For one of our use we were going for this, but since I read the above > part > > in documentation so we are going for Cassandra now (to store records and > > query them for a special case) > > > > > > > > > > > > Regards, > > Vinay Patil > > > > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen wrote: > > > > > In streaming, memory is mainly needed for state (key/value state). The > > > exact representation depends on the chosen StateBackend. > > > > > > State is explicitly released: For windows, state is cleaned up > > > automatically (firing / expiry), for user-defined state, keys have to > be > > > explicitly cleared (clear() method) or in the future will have the > option > > > to expire. > > > > > > The heavy work horse for streaming state is currently RocksDB, which > > > internally uses native (off-heap) memory to keep the data. > > > > > > Does that help? > > > > > > Stephan > > > > > > > > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik > > > wrote: > > > > > > > As per the docs, in Batch mode, dynamic memory allocation is avoided > by > > > > storing messages being processed in ByteBuffers via Unsafe methods. > > > > > > > > Couldn't find any docs describing mem mgmt in Streamingn mode. So... > > > > > > > > - Am wondering if this is also the case with Streaming ? > > > > > > > > - If so, how does Flink detect that an object is no longer being used > > and > > > > can be reclaimed for reuse once again ? > > > > > > > > -roshan > > > > > > > > > > --001a11475d4ed7b41d053b60d9f2--