Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 41C22200B31 for ; Tue, 24 May 2016 14:50:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 40736160A35; Tue, 24 May 2016 12:50:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 80F06160A34 for ; Tue, 24 May 2016 14:50:40 +0200 (CEST) Received: (qmail 2161 invoked by uid 500); 24 May 2016 12:50:39 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 2150 invoked by uid 99); 24 May 2016 12:50:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 12:50:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 35318C0DF1 for ; Tue, 24 May 2016 12:50:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.593 X-Spam-Level: X-Spam-Status: No, score=0.593 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id oCYXxTwavOrb for ; Tue, 24 May 2016 12:50:36 +0000 (UTC) Received: from mail-wm0-f46.google.com (mail-wm0-f46.google.com [74.125.82.46]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 58FC65F1EE for ; Tue, 24 May 2016 12:50:36 +0000 (UTC) Received: by mail-wm0-f46.google.com with SMTP id z87so13011449wmh.1 for ; Tue, 24 May 2016 05:50:36 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=mime-version:subject:from:in-reply-to:date :content-transfer-encoding:message-id:references:to; bh=8OPcChYY7fYm6RbaGPBtaZc0ZFijPTQwpc2u7wkY/YA=; b=IqR+/cMrjYPwgsKg4+vgPzI9o7Az/4Y8OyZ98iFg1ZlOAfFjwgIkvWDFc6d8zwOJKZ 3OGxXLwSRxma7NzrwtBZ4N7eDbpkWtaaLduG5nh6FDth8ZUhicDkEhd5JJ6c4rkfMLyX 1ExmrTGV7juEWYsRuBprhhzpiYhU+9V80oYYZExhOmafWwlRky4yL+LLmCwFoC5yXqbk SvUtFSm0kHo28Iru/iJGcM38DSXbTYZwMVmGAfu/ktK87i4k5YKDqpwmq5NvRhCfuuLo AWhhv6S9POOfmguy0+/NuI5xNjrYSQZU+kfbStEQBkURioceGnCG9sy4Ph4wH5PdCRNz 8bGQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:subject:from:in-reply-to:date :content-transfer-encoding:message-id:references:to; bh=8OPcChYY7fYm6RbaGPBtaZc0ZFijPTQwpc2u7wkY/YA=; b=O6JHJnuNerR5nnkWOY6lwK22klVmBS3cLjMg85y8eaOXcPuSjs2OVtXoiwCB9xn2Ep AjVYfXag0tx4PIp/PipRx7+xXkS//ntisYsdwwJxhwd+8l+0XXmLMkrrez/ZcYMTFhuR pN/PDEnYZuD9v8aCLmBVKfr8cblAU8E9UekFFoejXy9RkmaiFKG11ambtHGrfmDbeEpe J8u7DVC0WsARZMqOPVfYxSS0TTWbokbfl2fhX5oYyGp6jJVY9gcycL2ZMQaiQ/By32ps XHpP8ayKKtLlAd+prUonpoj+qKF6jFijYCfOfEbyyEUnSyibW+Aw/uxi5H97QzgGKkfI ijkw== X-Gm-Message-State: ALyK8tLLMc1ukSaHySDJ5CFkspX40awHE6Ih5pmj821LKejGGcCJTdwx8H9pizLPNNZtf2DE X-Received: by 10.28.232.90 with SMTP id f87mr10238984wmh.17.1464094235920; Tue, 24 May 2016 05:50:35 -0700 (PDT) Received: from macklou.fritz.box (ip5b40315a.dynamic.kabel-deutschland.de. [91.64.49.90]) by smtp.gmail.com with ESMTPSA id c4sm3093856wjm.24.2016.05.24.05.50.35 for (version=TLS1 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Tue, 24 May 2016 05:50:35 -0700 (PDT) Content-Type: text/plain; charset=us-ascii Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\)) Subject: Re: Dynamic partitioning for stream output From: Kostas Kloudas In-Reply-To: <1464088960939-7122.post@n4.nabble.com> Date: Tue, 24 May 2016 14:50:32 +0200 Content-Transfer-Encoding: quoted-printable Message-Id: <99D798EB-0E63-4D45-B31E-E238DE02877D@data-artisans.com> References: <1464088960939-7122.post@n4.nabble.com> To: user@flink.apache.org X-Mailer: Apple Mail (2.3124) archived-at: Tue, 24 May 2016 12:50:41 -0000 Hi Juho, If I understand correctly, you want a custom RollingSink that caches = some=20 buckets, one for each topic/date key, and whenever the volume of data = buffered exceeds a limit, then it flushes to disk, right?=20 If this is the case, then you are right that this is not currently = supported out-of-the-box, but it would be interesting to update the RollingSink=20 to support such scenarios. One clarification: when you say that you want partition by date, you mean the date of the event, right? Not the processing time. Kostas > On May 24, 2016, at 1:22 PM, Juho Autio wrote: >=20 > Could you suggest how to dynamically partition data with Flink = streaming? >=20 > We've looked at RollingSink, that takes care of writing batches to S3, = but > it doesn't allow defining the partition dynamically based on the tuple > fields. >=20 > Our data is coming from Kafka and essentially has the kafka topic and = a > date, among other fields. >=20 > We'd like to consume all topics (also automatically subscribe to new = ones) > and write to S3 partitioned by topic and date, for example: >=20 > s3://bucket/path/topic=3Dtopic2/date=3D20160522/ > s3://bucket/path/topic=3Dtopic2/date=3D20160523/ > s3://bucket/path/topic=3Dtopic1/date=3D20160522/ > s3://bucket/path/topic=3Dtopic1/date=3D20160523/ >=20 > There are two problems with RollingSink as it is now: > - Only allows partitioning by date > - Flushes the batch every time the path changes. In our case the = stream can > for example have a random mix of different topics and that would mean = that > RollingSink isn't able to respect the max flush size but keeps = flushing the > files pretty much on every tuple. >=20 > We've thought that we could implement a sink that internally creates = and > handles multiple RollingSink instances as needed for partitions. But = it > would be great to first hear any suggestions that you might have. >=20 > If we have to extend RollingSink, it would be nice to make it take a > partitioning function as a parameter. The function would be called for = each > tuple to create the output path. >=20 >=20 >=20 > -- > View this message in context: = http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynami= c-partitioning-for-stream-output-tp7122.html > Sent from the Apache Flink User Mailing List archive. mailing list = archive at Nabble.com.