From user-return-18105-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Feb 9 16:23:33 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 9A37E180654 for ; Fri, 9 Feb 2018 16:23:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8994C160C4C; Fri, 9 Feb 2018 15:23:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AA258160C2E for ; Fri, 9 Feb 2018 16:23:32 +0100 (CET) Received: (qmail 22481 invoked by uid 500); 9 Feb 2018 15:23:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 22471 invoked by uid 99); 9 Feb 2018 15:23:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Feb 2018 15:23:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 53346C02AC for ; Fri, 9 Feb 2018 15:23:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id SNvEV76r2XC1 for ; Fri, 9 Feb 2018 15:23:25 +0000 (UTC) Received: from mail-wm0-f44.google.com (mail-wm0-f44.google.com [74.125.82.44]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 8873D5F5B3 for ; Fri, 9 Feb 2018 15:23:24 +0000 (UTC) Received: by mail-wm0-f44.google.com with SMTP id 143so16004971wma.5 for ; Fri, 09 Feb 2018 07:23:24 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=sZbEWO5FbMD+kwYq1kJTMQhLnCLJEeL2KamuV1hBEY8=; b=RBNQOQhXKQ+9hzgL3gudflF/x68VRCz0zjb+TcyIzzNWD3/Z5ooYmve+gu3tKdyoBV TnJu9Kg3X6SPtRQEeyFlicfSXU0LXku1/4VfVDlXr/fj+mvK7xe9oIoW/urLhbaCfMeP KBoj5w37pnX4m6CpLpFcQeHcb1SArg/KozgBXoiAFjCTj6Wh60gYvhC8YY35w6mJwKMy UrYk7v+GQ9j/eqjbmnNVxGGYgpadVdYttcHk4jrgWSGBk0rmKUBW2xLvMNI4V9RpoDY/ Lb8HPCfFeRb/IRePbMDkw7/aWBfUg1t4eIy4nSjl05zUr0OG0N002sLZ8RXTFFCGNZBy MqIA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=sZbEWO5FbMD+kwYq1kJTMQhLnCLJEeL2KamuV1hBEY8=; b=hIEwlG169H39ObMXFwRzM96OkWrs2526Xaq+wUt03AK/IN8JhDcOF4mKm/vN4k/JxD 5lW+QIwp+Gxq32BlMePMRDTjPPbtHNG93v8CI8xIzyd64J16qTT88bkurrtXoFpAC3eL E+ZMBxBVjRQqma5kIrBMHpY0R3qW95lFny3Fg9DumD3KwOMm784HTRvijDui97uZpNmy kYxRICvI0S5G0IYzxuGl7MQGkZc/1+wu9vqRS3GST+1otkQQtSrJ7q2bPS/BLZ4gtYW2 TUxgi/VXELbuarn1LUbV2/hxRKXe6byhBWVueJDEt/6WSjoc/+NYegczriFl2AhlifAm BoBA== X-Gm-Message-State: APf1xPCXHvGeStXI0pvlmPw+OSG8FqS9CKWDLHmbNb22MmHAqZASmX+y ADBw/ahXcQvtkB211d5gqH/3vH8ykTOzyxTYwjA= X-Google-Smtp-Source: AH8x225USSJ65CaqLtNHga1QLaYKsUnGkdnGSEonQv2nyLNjLW7oEUWmAigfgztC/ByN2S96e6IO8K+K9jwGLGHGeLM= X-Received: by 10.80.186.114 with SMTP id 47mr4273203eds.252.1518189804184; Fri, 09 Feb 2018 07:23:24 -0800 (PST) MIME-Version: 1.0 Received: by 10.80.176.6 with HTTP; Fri, 9 Feb 2018 07:23:23 -0800 (PST) In-Reply-To: References: From: david westwood Date: Fri, 9 Feb 2018 10:23:23 -0500 Message-ID: Subject: Re: dataset sort To: Fabian Hueske Cc: Till Rohrmann , user Content-Type: multipart/alternative; boundary="f403045c2688faf3910564c91b4a" --f403045c2688faf3910564c91b4a Content-Type: text/plain; charset="UTF-8" Thanks. I have to stream in the historical data and its out-of-boundedness >> real-time data. I thought there was some elegant way using mapPartition that I wasn't seeing. On Fri, Feb 9, 2018 at 5:10 AM, Fabian Hueske wrote: > You can also partition by range and sort and write each partition. Once > all partitions have been written to files, you can concatenate the files. > As Till said it is not possible to sort in parallel and write in order to > a single file. > > Best, Fabian > > 2018-02-09 10:35 GMT+01:00 Till Rohrmann : > >> Hi David, >> >> Flink only supports sorting within partitions. Thus, if you want to write >> out a globally sorted dataset you should set the parallelism to 1 which >> effectively results in a single partition. Decreasing the parallelism of >> an operator will cause the individual partitions to lose its sort order >> because the individual partitions are read in a non deterministic order. >> >> Cheers, >> Till >> >> >> On Thu, Feb 8, 2018 at 8:07 PM, david westwood < >> david.d.westwood@gmail.com> wrote: >> >>> Hi: >>> >>> I would like to sort historical data using the dataset api. >>> >>> env.setParallelism(10) >>> >>> val dataset = [(Long, String)] .. >>> .paritionByRange(_._1) >>> .sortPartition(_._1, Order.ASCEDING) >>> .writeAsCsv("mydata.csv").setParallelism(1) >>> >>> the data is out of order (in local order) >>> but >>> .print() >>> prints the data in to correct order. I have run a small toy sample >>> multiple times. >>> >>> Is there a way to sort the entire dataset with parallelism > 1 and write >>> it to a single file in ascending order? >>> >> >> > --f403045c2688faf3910564c91b4a Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks.

I have to stream in the histori= cal data and its out-of-boundedness >> real-time data. I thought ther= e was some elegant way using mapPartition that I wasn't seeing.

On Fri, Feb 9, = 2018 at 5:10 AM, Fabian Hueske <fhueske@gmail.com> wrote:
You can also par= tition by range and sort and write each partition. Once all partitions have= been written to files, you can concatenate the files.
As Till sai= d it is not possible to sort in parallel and write in order to a single fil= e.

Best, Fabian

2018-02-09 10:= 35 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
Hi David,

F= link only supports sorting within partitions. Thus, if you want to write ou= t a globally sorted dataset you should set the parallelism to 1 which effec= tively results in a single partition. Decreasing = the parallelism of an operator will cause the individual partitions to lose= its sort order because the individual partitions are read in a non determi= nistic order.

Cheers,
Till
<= div>

On Thu, Feb 8, 2018 at 8:07 PM, david westwood <davi= d.d.westwood@gmail.com> wrote:
Hi:
I would like to sort historical data using the dataset api.

=
env.setParallelism(10)

val dataset =3D [= (Long, String)] ..
.paritionByRange(_._1)
.sortPartition(= _._1, Order.ASCEDING)
.writeAsCsv("mydata.csv").setParallelism(1)

the data is out of order (in local order)
but
.print()
prints the data in to correct order. I have r= un a small toy sample multiple times.

Is there a way to sort t= he entire dataset with parallelism > 1 and write it to a single file in = ascending order?



--f403045c2688faf3910564c91b4a--