From user-return-32328-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Jan 31 07:10:58 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1ED4D18037A for ; Fri, 31 Jan 2020 08:10:58 +0100 (CET) Received: (qmail 45114 invoked by uid 500); 31 Jan 2020 07:10:56 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 45104 invoked by uid 99); 31 Jan 2020 07:10:56 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Jan 2020 07:10:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D49AA181377 for ; Fri, 31 Jan 2020 07:10:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.001 X-Spam-Level: X-Spam-Status: No, score=0.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=touk.pl Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id mcZqTicicNmv for ; Fri, 31 Jan 2020 07:10:54 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=212.180.179.38; helo=mx.touk.pl; envelope-from=ljd@touk.pl; receiver= Received: from mx.touk.pl (mx.touk.pl [212.180.179.38]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 5E321BB801 for ; Fri, 31 Jan 2020 07:10:54 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=touk.pl; s=20180101; h=Content-Type:In-Reply-To:MIME-Version:Date:Message-ID:From:References:Cc:To:Subject; bh=JCAHSO05fdlT7y6BZ4isvIERuRk5vLm437wao69zrmA=; b=ruhZTwafM6hUbV7TEYzi/2TwbrcBYFgm5495OldOVuTbm796x595ms1WTlKmRmTutzA8u1dGIH9Y2QhbcgS5RhNPiTHCHh6TeH+HMHYjIShaH8aGHmMbZjdQZbQsFvGvLxPN5KDV16d4LtJ+78ghZItwqN7HvNeL1ugNH32L9Is=; Subject: Re: Sorting bounded data stream To: Jingsong Li Cc: user References: <574ae4e7-195e-78ea-1c7d-f4699ebc9846@touk.pl> From: =?UTF-8?B?xYF1a2FzeiBKxJlkcnplamV3c2tp?= Message-ID: <558da9f4-5788-ddbe-ad3e-ee206cd62be6@touk.pl> Date: Fri, 31 Jan 2020 08:10:52 +0100 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Thunderbird/68.4.1 MIME-Version: 1.0 In-Reply-To: Content-Type: multipart/alternative; boundary="------------B0F3D85C346DDFB7055CC402" Content-Language: en-GB This is a multi-part message in MIME format. --------------B0F3D85C346DDFB7055CC402 Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Hi, Thank you very much for suggestions. I will check out the UnilateralSortMerge. However in our case we are using checkpoints. Kind regards, Łukasz W dniu 31.01.2020 o 07:54, Jingsong Li pisze: > Hi Łukasz, > > First, we are planning to design and implement the BoundedStream > story, which will be discussed further in 1.11 or 1.12. > > SortedMapState was discussed in FLINK-6219 [1], But there are some > problems that can not be solved well, so they have not been introduced. > If it is a pure BoundedStream without checkpoints, it is not > recommended to use state, because state is usually used for > checkpoints, which will cause more overhead. > > SortOperator is introduced for table BaseRow, I recommend that you use > the UnilateralSortMerger to construct your own SortOperator. > > [1] https://issues.apache.org/jira/browse/FLINK-6219 > > Best, > Jingsong Lee > > On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski > wrote: > > Hi all, > > In Flink 1.9 couple of changes was introduced to deal with bounded > streams e.g.  BoundedOneInput interface. I'm wondering would it be > doable to do some kind of global sort after receiving end input > event on > finished data stream source, using only DataStream API? > > We have made some experiments with BoundedOneInput - buffering > elements > and then sorting them after receiving the end input event and finally > emitting sorted elements. it is seems to be working as excepted > though > we are having troubles to sort a big stream efficiently. One > problem is > missing appropriate state type something like SortedMapState. While > using MapState the elements are inserted into a kind of byte order. I > think it could be possible to do some key modification to achieve > correct bytes order but it's not trivial for every type (string, int, > tuples, and so on). Do you plan adding such kind of sorted state? > > In Flink Table API there is SortOperator but it is restricted to > BinaryRow. Would it be possible to adapt this functionality in > streaming > API for arbitrary types? What do you think? > > Thanks, > Łukasz > > > > -- > Best, Jingsong Lee --------------B0F3D85C346DDFB7055CC402 Content-Type: text/html; charset=utf-8 Content-Transfer-Encoding: 8bit

Hi,

Thank you very much for suggestions. I will check out the UnilateralSortMerge. However in our case we are using checkpoints.

Kind regards,
Łukasz

W dniu 31.01.2020 o 07:54, Jingsong Li pisze:
Hi Łukasz,

First, we are planning to design and implement the BoundedStream story, which will be discussed further in 1.11 or 1.12.

SortedMapState was discussed in FLINK-6219 [1], But there are some problems that can not be solved well, so they have not been introduced.
 
If it is a pure BoundedStream without checkpoints, it is not recommended to use state, because state is usually used for checkpoints, which will cause more overhead.

SortOperator is introduced for table BaseRow, I recommend that you use the UnilateralSortMerger to construct your own SortOperator.


Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski <ljd@touk.pl> wrote:
Hi all,

In Flink 1.9 couple of changes was introduced to deal with bounded
streams e.g.  BoundedOneInput interface. I'm wondering would it be
doable to do some kind of global sort after receiving end input event on
finished data stream source, using only DataStream API?

We have made some experiments with BoundedOneInput - buffering elements
and then sorting them after receiving the end input event and finally
emitting sorted elements. it is seems to be working as excepted though
we are having troubles to sort a big stream efficiently. One problem is
missing appropriate state type something like SortedMapState. While
using MapState the elements are inserted into a kind of byte order. I
think it could be possible to do some key modification to achieve
correct bytes order but it's not trivial for every type (string, int,
tuples, and so on). Do you plan adding such kind of sorted state?

In Flink Table API there is SortOperator but it is restricted to
BinaryRow. Would it be possible to adapt this functionality in streaming
API for arbitrary types? What do you think?

Thanks,
Łukasz



--
Best, Jingsong Lee
--------------B0F3D85C346DDFB7055CC402--