Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 570F8200A5B for ; Wed, 25 May 2016 08:40:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 55A13160A18; Wed, 25 May 2016 06:40:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4F623160A17 for ; Wed, 25 May 2016 08:40:29 +0200 (CEST) Received: (qmail 23107 invoked by uid 500); 25 May 2016 06:40:28 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 23097 invoked by uid 99); 25 May 2016 06:40:28 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 May 2016 06:40:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id EBDB11804C3 for ; Wed, 25 May 2016 06:40:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.492 X-Spam-Level: ** X-Spam-Status: No, score=2.492 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=rovio.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id InvS6Eg23v3d for ; Wed, 25 May 2016 06:40:25 +0000 (UTC) Received: from mail-qk0-f179.google.com (mail-qk0-f179.google.com [209.85.220.179]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id 2C7E35FB9E for ; Wed, 25 May 2016 06:40:25 +0000 (UTC) Received: by mail-qk0-f179.google.com with SMTP id h185so1581382qke.2 for ; Tue, 24 May 2016 23:40:25 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=rovio.com; s=google; h=mime-version:in-reply-to:references:date:message-id:subject:from:to; bh=42XOCAzqByJpsVT96WS3BsJD/paXl5GVfiGvCGSHOGg=; b=QUbbEwytOfLdJIGwrym110hR/LM0Lh1pYHS4cLDHs/0hS13wTlH/HiOcL2Qn2mCWqu 4NMw/mhINZV75inwOa793BFaubCwUxEKUDpu1N3BM9B0MFQPxEvtu7zZVJjn+3kP+fAu 7N9tT3T5sRud9S/VovfiMxXyVcCjRgARiYnS4= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to; bh=42XOCAzqByJpsVT96WS3BsJD/paXl5GVfiGvCGSHOGg=; b=f8HsV/imvZi1vJd3YJUYRnhDKJvIZ2+2QL8N5bZHS6jwW46QRme9h7v9T+L73f4HBN NoCxK3aMhQYCIXY9qF7uuDuMaIAEuhFyhfL72ln+wIEkuKhplHOSohN2Xcv11SdqjR84 Zg3q7bPxMa9Q4TrYMGIIQkVqwDV6xf76c5epJvcOpmu0XrIK//u0Fbyt/oomnsD6yDXA edV8KT2CHNVFbhABPFz5GG89b4c+3j+MxkBBSsXNq+VHksTHTIMfA2Pbklh+CMNLbCAi s2GBIOY4BbTacg4w3YRxR63VkpAxWQYzaV+koPmwvVkp667TpY2ehi667WT08ow2kE2i 0ZPQ== X-Gm-Message-State: ALyK8tIliuH6MqXxa3pL4SOHSZKrozxQGy6JgAs8yzKzLDtAvgjRX+PzfUufjfmKXk6NtqBZJBBtCcfydavEl+Zj MIME-Version: 1.0 X-Received: by 10.55.12.68 with SMTP id 65mr1927534qkm.79.1464158418311; Tue, 24 May 2016 23:40:18 -0700 (PDT) Received: by 10.55.115.70 with HTTP; Tue, 24 May 2016 23:40:18 -0700 (PDT) In-Reply-To: References: <1464088960939-7122.post@n4.nabble.com> <99D798EB-0E63-4D45-B31E-E238DE02877D@data-artisans.com> Date: Wed, 25 May 2016 09:40:18 +0300 Message-ID: Subject: Re: Dynamic partitioning for stream output From: Juho Autio To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114c8c946b3a360533a4f291 archived-at: Wed, 25 May 2016 06:40:30 -0000 --001a114c8c946b3a360533a4f291 Content-Type: text/plain; charset=UTF-8 Related issue: https://issues.apache.org/jira/browse/FLINK-2672 On Wed, May 25, 2016 at 9:21 AM, Juho Autio wrote: > Thanks, indeed the desired behavior is to flush if bucket size exceeds a > limit but also if the bucket has been open long enough. Contrary to the > current RollingSink we don't want to flush all the time if the bucket > changes but have multiple buckets "open" as needed. > > In our case the date to use for partitioning comes from an event field, > but needs to be formatted, too. The partitioning feature should be generic, > allowing to pass a function that formats the bucket path for each tuple. > > Does it seem like a valid plan to create a sink that internally caches > multiple rolling sinks? > > > On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas < > k.kloudas@data-artisans.com> wrote: > >> Hi Juho, >> >> If I understand correctly, you want a custom RollingSink that caches some >> buckets, one for each topic/date key, and whenever the volume of data >> buffered >> exceeds a limit, then it flushes to disk, right? >> >> If this is the case, then you are right that this is not currently >> supported >> out-of-the-box, but it would be interesting to update the RollingSink >> to support such scenarios. >> >> One clarification: when you say that you want partition by date, >> you mean the date of the event, right? Not the processing time. >> >> Kostas >> >> > On May 24, 2016, at 1:22 PM, Juho Autio wrote: >> > >> > Could you suggest how to dynamically partition data with Flink >> streaming? >> > >> > We've looked at RollingSink, that takes care of writing batches to S3, >> but >> > it doesn't allow defining the partition dynamically based on the tuple >> > fields. >> > >> > Our data is coming from Kafka and essentially has the kafka topic and a >> > date, among other fields. >> > >> > We'd like to consume all topics (also automatically subscribe to new >> ones) >> > and write to S3 partitioned by topic and date, for example: >> > >> > s3://bucket/path/topic=topic2/date=20160522/ >> > s3://bucket/path/topic=topic2/date=20160523/ >> > s3://bucket/path/topic=topic1/date=20160522/ >> > s3://bucket/path/topic=topic1/date=20160523/ >> > >> > There are two problems with RollingSink as it is now: >> > - Only allows partitioning by date >> > - Flushes the batch every time the path changes. In our case the stream >> can >> > for example have a random mix of different topics and that would mean >> that >> > RollingSink isn't able to respect the max flush size but keeps flushing >> the >> > files pretty much on every tuple. >> > >> > We've thought that we could implement a sink that internally creates and >> > handles multiple RollingSink instances as needed for partitions. But it >> > would be great to first hear any suggestions that you might have. >> > >> > If we have to extend RollingSink, it would be nice to make it take a >> > partitioning function as a parameter. The function would be called for >> each >> > tuple to create the output path. >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html >> > Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > --001a114c8c946b3a360533a4f291 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Related issue: https://issues.apache.org/jira/browse/FLINK-2672

On Wed, May 25, 2016 a= t 9:21 AM, Juho Autio <juho.autio@rovio.com> wrote:
Thanks, indeed the desired be= havior is to flush if bucket size exceeds a limit but also if the bucket ha= s been open long enough. Contrary to the current RollingSink we don't w= ant to flush all the time if the bucket changes but have multiple buckets &= quot;open" as needed.

In our case the date to use f= or partitioning comes from an event field, but needs to be formatted, too. = The partitioning feature should be generic, allowing to pass a function tha= t formats the bucket path for each tuple.

Does it = seem like a valid plan to create a sink that internally caches multiple rol= ling sinks?


On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <k.kloudas@data-artisans.com> wrote:
Hi Juho,

If I understand correctly, you want a custom RollingSink that caches some buckets, one for each topic/date key, and whenever the volume of data buffe= red
exceeds a limit, then it flushes to disk, right?

If this is the case, then you are right that this is not currently supporte= d
out-of-the-box, but it would be interesting to update the RollingSink
to support such scenarios.

One clarification: when you say that you want partition by date,
you mean the date of the event, right? Not the processing time.

Kostas

> On May 24, 2016, at 1:22 PM, Juho Autio <juho.autio@rovio.com> wrote:
>
> Could you suggest how to dynamically partition data with Flink streami= ng?
>
> We've looked at RollingSink, that takes care of writing batches to= S3, but
> it doesn't allow defining the partition dynamically based on the t= uple
> fields.
>
> Our data is coming from Kafka and essentially has the kafka topic and = a
> date, among other fields.
>
> We'd like to consume all topics (also automatically subscribe to n= ew ones)
> and write to S3 partitioned by topic and date, for example:
>
> s3://bucket/path/topic=3Dtopic2/date=3D20160522/
> s3://bucket/path/topic=3Dtopic2/date=3D20160523/
> s3://bucket/path/topic=3Dtopic1/date=3D20160522/
> s3://bucket/path/topic=3Dtopic1/date=3D20160523/
>
> There are two problems with RollingSink as it is now:
> - Only allows partitioning by date
> - Flushes the batch every time the path changes. In our case the strea= m can
> for example have a random mix of different topics and that would mean = that
> RollingSink isn't able to respect the max flush size but keeps flu= shing the
> files pretty much on every tuple.
>
> We've thought that we could implement a sink that internally creat= es and
> handles multiple RollingSink instances as needed for partitions. But i= t
> would be great to first hear any suggestions that you might have.
>
> If we have to extend RollingSink, it would be nice to make it take a > partitioning function as a parameter. The function would be called for= each
> tuple to create the output path.
>
>
>
> --
> View this message in context: http://apache-flink-us= er-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stre= am-output-tp7122.html
> Sent from the Apache Flink User Mailing List archive. mailing list arc= hive at Nabble.com.
<= /div>
--001a114c8c946b3a360533a4f291--