From issues-return-148407-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Jan 18 14:37:45 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id AEA1E180654 for ; Thu, 18 Jan 2018 14:37:45 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9E946160C36; Thu, 18 Jan 2018 13:37:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E3330160C2B for ; Thu, 18 Jan 2018 14:37:44 +0100 (CET) Received: (qmail 91723 invoked by uid 500); 18 Jan 2018 13:37:44 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 91714 invoked by uid 99); 18 Jan 2018 13:37:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jan 2018 13:37:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8AB841A077D for ; Thu, 18 Jan 2018 13:37:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.029 X-Spam-Level: X-Spam-Status: No, score=-4.029 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id tJWsra1PRS31 for ; Thu, 18 Jan 2018 13:37:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 99A6D5F640 for ; Thu, 18 Jan 2018 13:37:41 +0000 (UTC) Received: (qmail 91695 invoked by uid 99); 18 Jan 2018 13:37:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jan 2018 13:37:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C64B5DFD7B; Thu, 18 Jan 2018 13:37:40 +0000 (UTC) From: StefanRRichter To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5281: [FLINK-7938] [State Backend] support addAll() in L... Content-Type: text/plain Message-Id: <20180118133740.C64B5DFD7B@git1-us-west.apache.org> Date: Thu, 18 Jan 2018 13:37:40 +0000 (UTC) Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5281#discussion_r162343086 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -167,24 +167,48 @@ public void update(List values) throws Exception { try { writeCurrentKeyWithGroupAndNamespace(); byte[] key = keySerializationStream.toByteArray(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - List bytes = new ArrayList<>(values.size()); - for (V value : values) { - keySerializationStream.reset(); - valueSerializer.serialize(value, out); - bytes.add(keySerializationStream.toByteArray()); + byte[] premerge = getPreMergedValue(values); + if (premerge != null) { + backend.db.put(columnFamily, writeOptions, key, premerge); + } else { + throw new IOException("Failed pre-merge values in update()"); } + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while updating data to RocksDB", e); + } + } + } + + @Override + public void addAll(List values) throws Exception { + if (values != null && !values.isEmpty()) { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); - byte[] premerge = MergeUtils.merge(bytes); + byte[] premerge = getPreMergedValue(values); if (premerge != null) { - backend.db.put(columnFamily, writeOptions, key, premerge); + backend.db.merge(columnFamily, writeOptions, key, premerge); } else { - throw new IOException("Failed pre-merge values"); + throw new IOException("Failed pre-merge values in addAll()"); } } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while updating data to RocksDB", e); } } } + + private byte[] getPreMergedValue(List values) throws IOException { + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + + List bytes = new ArrayList<>(values.size()); --- End diff -- Interesting, do you have an idea why it did not work? I think it should be possible. In general, I am not a big fan of changing this code twice when we already assume that we do an overhaul of that part, but we can do it for this time if it makes your life easier. Sorry that we could not get it done for the meetup, but I was blocked with another important matter :-( ---