From user-return-18108-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Feb 9 16:50:23 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 68A27180654 for ; Fri, 9 Feb 2018 16:50:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 58072160C4C; Fri, 9 Feb 2018 15:50:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 74FFD160C2E for ; Fri, 9 Feb 2018 16:50:22 +0100 (CET) Received: (qmail 90953 invoked by uid 500); 9 Feb 2018 15:50:21 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 90939 invoked by uid 99); 9 Feb 2018 15:50:20 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Feb 2018 15:50:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 12F831A0832 for ; Fri, 9 Feb 2018 15:50:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Ca7Z3XYvm1Ss for ; Fri, 9 Feb 2018 15:50:18 +0000 (UTC) Received: from mail-lf0-f47.google.com (mail-lf0-f47.google.com [209.85.215.47]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id A8F555F67B for ; Fri, 9 Feb 2018 15:50:17 +0000 (UTC) Received: by mail-lf0-f47.google.com with SMTP id t79so11838155lfe.3 for ; Fri, 09 Feb 2018 07:50:17 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=UmvF/6UShwANQxtvLSjFvsA6ON0VewY4FyiHqNUcmbU=; b=N/qX8SpplDAz5HMN2esWGTYBOh1ZaLuYr+LBIwq5rGEmppCT7CRTaIi64WsoNz98SE 8rn9a5CCJjGz4IG+Scx8PmmP2itfAqal3nVleoPULUTn4IiTE7nFXqrEmOSAZj8xJltE XqpAsFqlcXMLpWhraXfz1VLVspPD3Ib6zGwaXrbLfSLW8hYpPhVrIoVFqkcu82lDJ4mY gDuGN+5An3TDMgXjlRPXnGbXw5Yo8AuSit4sYcobnbtxB03QVQPwkFN14DA7G9KNplPt AVqa8ZcYIlsv4k7mVqYTIGC3CU43Z/8f88y2aZCvcmqssfSJi5xxc6MqKbZU8LQDqTS1 FEbQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=UmvF/6UShwANQxtvLSjFvsA6ON0VewY4FyiHqNUcmbU=; b=ORUqqWcr+oCbCzg4EA1EP6WkZBTqTtY/p3d9IuHg041G+cQnOQmbqaQUYZha+Gki4L X6JoHrV8ll9AA+pHddWA0+PIeeAA6DQNkuQ4EOA3SYcakpxolDm/znQuS8HQQbGC5HYe XJCpQjC82euO6Qx1cd5HnBWLTFzG5pK2SI8XgdV5E2NwwOSEU8PPPmQjA1jj/Rj5CbpF S9WMm7DaC7iw8yIqtsxM+Hwgm/9RxPqb+9MnCvbWks0K/9qeuYzb6G+spIHyYLNWpSos 9z0EmMWhRYLLUYZfUgegPvllhER86aFItYn9Dub9LYYoMNzhGN7sxotABHuTRuIhanuj /drg== X-Gm-Message-State: APf1xPDy+LJuo1A7fkUHaMJR9zzPhvEnNT3ISHrrvhnM/0APawT/ynbF oGJpW1VvBsMzhr+8adMW1kGNABUE+WP6ApZffzc= X-Google-Smtp-Source: AH8x226+tz3oKg7YxYQLdaOmZvTdEsKfyNJS+eEP1UjrMCEZbVQ1jSuQF1Jti8vKKf8lwjY9EFXgRQ9+jexVcTHS8Hk= X-Received: by 10.46.87.93 with SMTP id r29mr2248942ljd.93.1518191417094; Fri, 09 Feb 2018 07:50:17 -0800 (PST) MIME-Version: 1.0 Received: by 10.46.33.13 with HTTP; Fri, 9 Feb 2018 07:49:36 -0800 (PST) In-Reply-To: References: From: Fabian Hueske Date: Fri, 9 Feb 2018 16:49:36 +0100 Message-ID: Subject: Re: dataset sort To: david westwood Cc: Till Rohrmann , user Content-Type: multipart/alternative; boundary="f403045f864c1df7570564c97ca6" --f403045f864c1df7570564c97ca6 Content-Type: text/plain; charset="UTF-8" The reason why this isn't working in Flink are that * a file can only be written by a single process * Flink does not support merging of sorted network partitions but reads round-robin from incoming network channels. I think if you sort the historic data in parallel (without range partitioning, i.e., randomly partitioned) and write it out in multiple files, you could implement a source function that reads all files in parallel and generates ascending watermarks. It would be important that you have as many parallel source tasks as you have files to ensure that watermarks are properly generated. Apart from that, this should result in a nicely sorted stream. The watermark handling of the DataStream API will take care to "merge" the sorted files. Best, Fabian 2018-02-09 16:23 GMT+01:00 david westwood : > Thanks. > > I have to stream in the historical data and its out-of-boundedness >> > real-time data. I thought there was some elegant way using mapPartition > that I wasn't seeing. > > On Fri, Feb 9, 2018 at 5:10 AM, Fabian Hueske wrote: > >> You can also partition by range and sort and write each partition. Once >> all partitions have been written to files, you can concatenate the files. >> As Till said it is not possible to sort in parallel and write in order to >> a single file. >> >> Best, Fabian >> >> 2018-02-09 10:35 GMT+01:00 Till Rohrmann : >> >>> Hi David, >>> >>> Flink only supports sorting within partitions. Thus, if you want to >>> write out a globally sorted dataset you should set the parallelism to 1 >>> which effectively results in a single partition. Decreasing the >>> parallelism of an operator will cause the individual partitions to lose its >>> sort order because the individual partitions are read in a non >>> deterministic order. >>> >>> Cheers, >>> Till >>> >>> >>> On Thu, Feb 8, 2018 at 8:07 PM, david westwood < >>> david.d.westwood@gmail.com> wrote: >>> >>>> Hi: >>>> >>>> I would like to sort historical data using the dataset api. >>>> >>>> env.setParallelism(10) >>>> >>>> val dataset = [(Long, String)] .. >>>> .paritionByRange(_._1) >>>> .sortPartition(_._1, Order.ASCEDING) >>>> .writeAsCsv("mydata.csv").setParallelism(1) >>>> >>>> the data is out of order (in local order) >>>> but >>>> .print() >>>> prints the data in to correct order. I have run a small toy sample >>>> multiple times. >>>> >>>> Is there a way to sort the entire dataset with parallelism > 1 and >>>> write it to a single file in ascending order? >>>> >>> >>> >> > --f403045f864c1df7570564c97ca6 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
The reason why this isn't working in Flink a= re that

* a file can only be written by a single process
* = Flink does not support merging of sorted network partitions but reads round= -robin from incoming network channels.

I think if you sor= t the historic data in parallel (without range partitioning, i.e., randomly= partitioned) and write it out in multiple files, you could implement a sou= rce function that reads all files in parallel and generates ascending water= marks.
It would be important that you have as many parallel s= ource tasks as you have files to ensure that watermarks are properly genera= ted. Apart from that, this should result in a nicely sorted stream.
The watermark handling of the DataStream API will take care to "= ;merge" the sorted files.

Best, Fabian


2018-02-09 16:23 GMT+01:00 david westwood <david.d.westwood@gmail.com>:
Thanks.

I have to stream in the his= torical data and its out-of-boundedness >> real-time data. I thought = there was some elegant way using mapPartition that I wasn't seeing.
=
On Fri, Feb 9, 2018 at 5:10 AM, Fabian Hueske = <fhueske@gmail.com> wrote:
You can also partition by range and sort and wr= ite each partition. Once all partitions have been written to files, you can= concatenate the files.
As Till said it is not possible to sort in= parallel and write in order to a single file.

Best, Fabian

201= 8-02-09 10:35 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
Hi David,

<= /div>
Flink only supports sorting within partitions. Thus, if you want = to write out a globally sorted dataset you should set the parallelism to 1 = which effectively results in a single partition. Decreasing the parallelism of an operator will cause the individual partiti= ons to lose its sort order because the individual partitions are read in a = non deterministic order.

Cheers,
= Till




--f403045f864c1df7570564c97ca6--