Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BF90518806 for ; Mon, 22 Jun 2015 21:51:39 +0000 (UTC) Received: (qmail 55980 invoked by uid 500); 22 Jun 2015 21:51:39 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 55911 invoked by uid 500); 22 Jun 2015 21:51:39 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 55901 invoked by uid 99); 22 Jun 2015 21:51:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jun 2015 21:51:39 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of hawin.jiang@gmail.com designates 209.85.213.178 as permitted sender) Received: from [209.85.213.178] (HELO mail-ig0-f178.google.com) (209.85.213.178) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jun 2015 21:49:23 +0000 Received: by igblr2 with SMTP id lr2so41199659igb.0 for ; Mon, 22 Jun 2015 14:51:11 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=9kZLgyNRkHblaQsi1Ag09qEkWFA+wYC23eA58O/KnDU=; b=UdoC2Ui2gs6EtfahNAiM3o5kqAEVth5V1e5Z79WTqKwZTNMiUVXMyckxYGEuAyXo8L qex6DC9/v5o7cLUt1nnPo64kIK6b67lMxnt0GbXBP4gpmfMsVdVKH2M7FS+wzHCbiFU+ o+NBtIvCTOFa41Ai5i88fQaJXR6quhzwTeluf2bixCw05/nSJFww1Iy5yvIaYgW02Prx judnJ6hYMfyP5C9CCf+xfS4oENvWguK9rLWMUXdGVINiUX+YfFekMZ9R2QokTZdL/Pvn BljTMBFymfqa3JhbSqFjk2mr9i0q0qnBSfVstRmE/shkIV+0kiHDaid1AIfde/nfAFug DBgw== MIME-Version: 1.0 X-Received: by 10.107.39.209 with SMTP id n200mr41694577ion.59.1435009871626; Mon, 22 Jun 2015 14:51:11 -0700 (PDT) Received: by 10.107.12.140 with HTTP; Mon, 22 Jun 2015 14:51:11 -0700 (PDT) In-Reply-To: References: <5577f3da.c1e3420a.30c3.506e@mx.google.com> Date: Mon, 22 Jun 2015 14:51:11 -0700 Message-ID: Subject: Re: Best way to write data to HDFS by Flink From: Hawin Jiang To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1140a3c2a5c7570519224503 X-Virus-Checked: Checked by ClamAV on apache.org --001a1140a3c2a5c7570519224503 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Marton if we received a huge data from kafka and wrote to HDFS immediately. We should use buffer timeout based on your URL I am not sure you have flume experience. Flume can be configured buffer size and partition as well. What is the partition. For example: I want to write 1 minute buffer file to HDFS which is /data/flink/year=3D2015/month=3D06/day=3D22/hour=3D21. if the partition(/data/flink/year=3D2015/month=3D06/day=3D22/hour=3D21) is = there, no need to create it. Otherwise, flume will create it automatically. Flume knows the coming data will come to right partition. I am not sure Flink also provided a similar partition API or configuration for this. Thanks. Best regards Hawin On Wed, Jun 10, 2015 at 10:31 AM, Hawin Jiang wrote= : > Thanks Marton > I will use this code to implement my testing. > > > > Best regards > Hawin > > On Wed, Jun 10, 2015 at 1:30 AM, M=C3=A1rton Balassi > wrote: > >> Dear Hawin, >> >> You can pass a hdfs path to DataStream's and DataSet's writeAsText and >> writeAsCsv methods. >> I assume that you are running a Streaming topology, because your source >> is Kafka, so it would look like the following: >> >> StreamExecutionEnvironment env =3D >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.addSource(PerisitentKafkaSource(..)) >> .map(/* do you operations*/) >> >> .wirteAsText("hdfs://:/path/to/your/file")= ; >> >> Check out the relevant section of the streaming docs for more info. [1] >> >> [1] >> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_gui= de.html#connecting-to-the-outside-world >> >> Best, >> >> Marton >> >> On Wed, Jun 10, 2015 at 10:22 AM, Hawin Jiang >> wrote: >> >>> Hi All >>> >>> >>> >>> Can someone tell me what is the best way to write data to HDFS when >>> Flink received data from Kafka? >>> >>> Big thanks for your example. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> Best regards >>> >>> Hawin >>> >>> >>> >> >> > --001a1140a3c2a5c7570519224503 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi =C2=A0Marton

if we received a huge d= ata from kafka and wrote to HDFS immediately.=C2=A0 We should use buffer ti= meout based on your URL
I am not sure you have flume experience.= =C2=A0 Flume can be configured buffer size and partition as well.

What is the partition. =C2=A0
For example:
<= div>I want to write 1 minute buffer file to HDFS which is /data/flink/year= =3D2015/month=3D06/day=3D22/hour=3D21.=C2=A0
if the partition(/da= ta/flink/year=3D2015/month=3D06/day=3D22/hour=3D21)=C2=A0is there, no need = to create it. Otherwise, flume will create it automatically.=C2=A0
Flume knows the coming data will come to right partition. =C2=A0
I am not sure Flink also provided a similar partition API = or configuration for this.=C2=A0
Thanks.

=


Best regards
Hawin
=

On Wed, Jun 10, 2= 015 at 10:31 AM, Hawin Jiang <hawin.jiang@gmail.com> wro= te:
Thanks Marton
I = will use this code to implement my testing.



Best regards
Hawin

On= Wed, Jun 10, 2015 at 1:30 AM, M=C3=A1rton Balassi <balassi.marton@= gmail.com> wrote:
Dear Hawin,

You can pass a hdfs path to DataStr= eam's and DataSet's writeAsText and writeAsCsv methods.
I= assume that you are running a Streaming topology, because your source is K= afka, so it would look like the following:

StreamExecut= ionEnvironment env =3D StreamExecutionEnvironment.getExecutionEnvironment()= ;

env.addSource(PerisitentKafkaSource(..))
=C2= =A0 =C2=A0 =C2=A0 .map(/* do you operations*/)
=C2=A0 =C2=A0 =C2= =A0 .wirteAsText("hdfs://<namenode_name>:<namenode_port>/p= ath/to/your/file");

Check out the relevant section of th= e streaming docs for more info. [1]


Best,

Marton

On Wed, Jun 10, 2015 at 10:22 AM, Hawin Jiang <haw= in.jiang@gmail.com> wrote:
=

Hi All

=C2=A0

Can someone tell me what is the best way to write = data to HDFS when Flink received data from Kafka?

<= p class=3D"MsoNormal">Big thanks for your example.<= /u>

=C2= =A0

=C2= =A0

=C2= =A0

=C2= =A0

Best regar= ds

Hawin=

=C2=A0


<= /div>


--001a1140a3c2a5c7570519224503--