Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F2A8E200C46 for ; Wed, 15 Mar 2017 02:43:47 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F139D160B89; Wed, 15 Mar 2017 01:43:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 24046160B7E for ; Wed, 15 Mar 2017 02:43:46 +0100 (CET) Received: (qmail 34773 invoked by uid 500); 15 Mar 2017 01:43:46 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 34764 invoked by uid 99); 15 Mar 2017 01:43:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Mar 2017 01:43:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D5632C05A9 for ; Wed, 15 Mar 2017 01:43:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.451 X-Spam-Level: * X-Spam-Status: No, score=1.451 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 5ylJFEEieioW for ; Wed, 15 Mar 2017 01:43:44 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id BD9255FAF1 for ; Wed, 15 Mar 2017 01:43:43 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 6250CE0984 for ; Wed, 15 Mar 2017 01:43:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id AB8B6243B0 for ; Wed, 15 Mar 2017 01:43:41 +0000 (UTC) Date: Wed, 15 Mar 2017 01:43:41 +0000 (UTC) From: "Syinchwun Leo (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 15 Mar 2017 01:43:48 -0000 [ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925393#comment-15925393 ] Syinchwun Leo edited comment on FLINK-5756 at 3/15/17 1:43 AM: --------------------------------------------------------------- Is it possible that avoiding using merge() operation. I notice that the result of RocksDB's get() is a byte array. My point is that when calling add() method of RocksDBListState, call get() first and get byte array, then append new value's serialized byte[] to byte array, then set back to Rocks. The method make it is possible there is only one byte[] under the key. I haven't test the idea, maybe the performance is not perfect and awkward. was (Author: syinchwunleo): Is it possible that avoiding using merge() operation. I notice that the result of RocksDB's get() is a byte array. My point is that when calling add() method of RocksDBListState, call get() first and get byte array, then append new value's serialized byte[] to byte array, then set to Rocks. I haven't test the idea, maybe the performance is not perfect and awkward. > When there are many values under the same key in ListState, RocksDBStateBackend performances poor > ------------------------------------------------------------------------------------------------- > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Environment: CentOS 7.2 > Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the same key in ListState, the windowState.get() operator performances very poor. I also the the RocksDB using version 4.11.2, the performance is also very poor. The problem is likely to related to RocksDB itself's get() operator after using merge(). The problem may influences the window operation's performance when the size is very large using ListState. I try to merge 50000 values under the same key in RocksDB, It costs 120 seconds to execute get() operation. > /////////////////////////////////////////////////////////////////////////////// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > //////////////////////////////////// > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/******/Data/") > val key = "key" > val value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 50000) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)