Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 13985189DA for ; Tue, 16 Feb 2016 18:52:25 +0000 (UTC) Received: (qmail 31207 invoked by uid 500); 16 Feb 2016 18:52:24 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 31109 invoked by uid 500); 16 Feb 2016 18:52:24 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 31099 invoked by uid 99); 16 Feb 2016 18:52:24 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Feb 2016 18:52:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 60734C0A6F for ; Tue, 16 Feb 2016 18:52:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.198 X-Spam-Level: * X-Spam-Status: No, score=1.198 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 6Qgoh_haaAC8 for ; Tue, 16 Feb 2016 18:52:23 +0000 (UTC) Received: from mail-ig0-f171.google.com (mail-ig0-f171.google.com [209.85.213.171]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id BC7EF5FBCC for ; Tue, 16 Feb 2016 18:52:22 +0000 (UTC) Received: by mail-ig0-f171.google.com with SMTP id hb3so80027279igb.0 for ; Tue, 16 Feb 2016 10:52:22 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=nLtSdUr7oS6sJGxIZblnqlTL2FJDBAxpSKm/AobiGLU=; b=r0xml8TUAk7DMbE//hdyuxrqF8sCPZKldB+sxnyUFUNIZPGHIwSMLqLi1F0iJwq1d9 /dpZvgfCzCGJUYw84FfS3BxWhcNbrqP3iATuzjtY79ZdSjgA8lv5L5XPh5d3a50EVs11 FpGbq8xE3Y11Fa9/grihhrQINRHXETtEMCt718Otb1shVspLryhUa906g+wa6KZNmIs6 7RBObsDeETY+X+sWB4RYJkLRhylF/Ipg2AHDv452muy54opivgW9JlRC9kD7L8RlCQd3 9BFiPKhrw6i2+Go9rXYexziNVlZ/ZrXm4RZLe6M1M2eGptEACb3WH85HA7zGxCI65NMr TPJQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=nLtSdUr7oS6sJGxIZblnqlTL2FJDBAxpSKm/AobiGLU=; b=kGR47p7YPzw7bkEADN1xfN+7d/IDmztW5h+Cf1uIfSAu0HGTW14hBjmtdPnBckerVC G2FiQ4a/zgOD2sskcmIE03BNwdmwRg7UfNP4gf+G0dQBrlN64e28uYzV3GjyBSRfITYV auGx48uADs74UAaE0i+Mfr1lk8ItUA31E5grIyZu8yIKirJWXN1Zuz7KR0lWgxNHUJkf 7dEdgEHq4k3rnq4RQ4lq2nS/4zg+R2xGKae/BAIrN4VD4tl4xhQpSc6Ztg4Ln3hlHPUO PMdwkpzpX1oGnMEUBnfFwU69oY1RnIaG8HIZ79ypkdtKk+kGt88TdUw1pLWBUZdBZpiF X26A== X-Gm-Message-State: AG10YOThAXaaG3nT4b+y+sb6e9fD03IOVFWHriMXWmh2ZEsQST+hoHjmOujZ+vRGkiZqOSHZS22EubYvR4APuQ== X-Received: by 10.50.112.10 with SMTP id im10mr1836077igb.93.1455648741625; Tue, 16 Feb 2016 10:52:21 -0800 (PST) MIME-Version: 1.0 Received: by 10.79.71.130 with HTTP; Tue, 16 Feb 2016 10:52:02 -0800 (PST) In-Reply-To: References: From: Srikanth Date: Tue, 16 Feb 2016 13:52:02 -0500 Message-ID: Subject: Re: writeAsCSV with partitionBy To: user@flink.apache.org Content-Type: multipart/alternative; boundary=047d7b1637ad299659052be7a2ec --047d7b1637ad299659052be7a2ec Content-Type: text/plain; charset=UTF-8 Fabian, Not sure if we are on the same page. If I do something like below code, it will groupby field 0 and each task will write a separate part file in parallel. val sink = data1.join(data2) .where(1).equalTo(0) { ((l,r) => ( l._3, r._3) ) } .partitionByHash(0) .writeAsCsv(pathBase + "output/test", rowDelimiter="\n", fieldDelimiter="\t" , WriteMode.OVERWRITE) This will create folder ./output/test/<1,2,3,4...> But what I was looking for is Hive style partitionBy that will output with folder structure ./output/field0=1/file ./output/field0=2/file ./output/field0=3/file ./output/field0=4/file Assuming field0 is Int and has unique values 1,2,3&4. Srikanth On Mon, Feb 15, 2016 at 6:20 AM, Fabian Hueske wrote: > Hi Srikanth, > > DataSet.partitionBy() will partition the data on the declared partition > fields. > If you append a DataSink with the same parallelism as the partition > operator, the data will be written out with the defined partitioning. > It should be possible to achieve the behavior you described using > DataSet.partitionByHash() or partitionByRange(). > > Best, Fabian > > > 2016-02-12 20:53 GMT+01:00 Srikanth : > >> Hello, >> >> >> >> Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink? >> >> I'm looking to save output as CSV files partitioned by two columns(date >> and hour). >> >> The partitionBy dataset API is more to partition the data based on a >> column for further processing. >> >> >> >> I'm thinking there is no direct API to do this. But what will be the best >> way of achieving this? >> >> >> >> Srikanth >> >> >> > > --047d7b1637ad299659052be7a2ec Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Fabian,

Not sure if we are on the same = page. If I do something like below code, it will groupby field 0 and each t= ask will write a separate part file in parallel.

<= div>=C2=A0 =C2=A0 val sink =3D data1.join(data2)
=C2=A0 =C2=A0 .where(1).equalTo(0) { ((l= ,r) =3D> ( l._3, r._3) ) }
=C2=A0 =C2=A0 .partitionByHash(0)
=C2=A0 =C2=A0 <= span class=3D"" style=3D"white-space:pre"> .writeAsCsv(pathBase + &q= uot;output/test", rowDelimiter=3D"\n", fieldDelimiter=3D&quo= t;\t" , WriteMode.OVERWRITE)

This will create folder ./output/test/<1,2,3,4...>

But what I was looking for is Hive style partitionBy that will outp= ut with folder structure

=C2=A0 =C2=A0./output/fie= ld0=3D1/file
=C2=A0 =C2=A0./output/field0=3D2/file
= =C2=A0 =C2=A0./output/field0=3D3/file
=C2=A0 =C2=A0./output/f= ield0=3D4/file

Assuming field0 is Int and has = unique values 1,2,3&4.

Srikanth

=

On Mo= n, Feb 15, 2016 at 6:20 AM, Fabian Hueske <fhueske@gmail.com> wrote:
= Hi Srikanth,

DataSet.partitionBy() will partition the data on = the declared partition fields.
If you append a DataSink with the same p= arallelism as the partition operator, the data will be written out with the= defined partitioning.
It should be possible to achieve the behavi= or you described using DataSet.partitionByHash() or partitionByRange().
=
Best, Fabian


2016-02-12 20:53 GMT+01:00 Srikanth <= srikanth.ht@gmai= l.com>:
Hello,

=C2=A0

Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?

I'm looking to save output as CSV files partitioned by two columns(date and hour).

The partitionBy dataset API is more to partition the data based on a column for further processing.

=C2=A0

I'm thinking there is no direct API to do this. But what will be the best way of achieving this?<= /span>

=C2=A0

Srikanth

=C2=A0



--047d7b1637ad299659052be7a2ec--