Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 67F6B200D1B for ; Thu, 12 Oct 2017 20:57:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 669FA1609E8; Thu, 12 Oct 2017 18:57:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8699C1609E4 for ; Thu, 12 Oct 2017 20:57:06 +0200 (CEST) Received: (qmail 93053 invoked by uid 500); 12 Oct 2017 18:57:05 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 93043 invoked by uid 99); 12 Oct 2017 18:57:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Oct 2017 18:57:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 94628183297 for ; Thu, 12 Oct 2017 18:57:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.401 X-Spam-Level: X-Spam-Status: No, score=-0.401 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id elerm5y8vSal for ; Thu, 12 Oct 2017 18:57:03 +0000 (UTC) Received: from mail-io0-f169.google.com (mail-io0-f169.google.com [209.85.223.169]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 002605FB9F for ; Thu, 12 Oct 2017 18:57:02 +0000 (UTC) Received: by mail-io0-f169.google.com with SMTP id 97so6539014iok.7 for ; Thu, 12 Oct 2017 11:57:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=G5FUsQ/DaSOGupjJQu2J1U66td4St1KOVBwGGSKCA1U=; b=HzCEydjNC7yD6NvLcg82H+j6oIpUu6fEC5FBgRZBe4bUhew7e8uiyscB26wYt2U0dY JbBlpmh0HKAjxoKydqphmn4FxQloSrh7sV8bXW+umyDyAPkHficEWj1JJNK2y3MNnB/s X6/+t6U3jxCoBqXAzVHFnCdFFD2X2jOZwkWRgBnx/ZZYblT1hsVpudSdfwmIuSpNwpNY vVJZqieZTsfUpV3WErjlMO2FW+lqgdm3+Sj6nGesy/4YqArT3rXLGRUlj1ZwPukljSH9 qSIl8xZuW2Q0Q+1ZiGsjRFdhh6PZbq8aNADGC/VhXRiMEW9ZRJiliPyPMVi5Ybdlh15U PKYQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=G5FUsQ/DaSOGupjJQu2J1U66td4St1KOVBwGGSKCA1U=; b=mCqkLFWDaQ3buoBiRtTacjpWY1fuhC4/NBUOwnT1LsKTn2j4sE9sURyv+8TOEKh5y+ WJ6ipCp/iAxRkuiTGrz3Bi7R5yNQBtrY71rrAwDKJBPvlF7Sqtj9j0G3VTYj83qUycMA OS1cdPwzZS+Op7HIvFbLBZ91XyZuMg4PR6+/nhgc7P/LliUC4kxK7seHTb1/fZvtJOHr 2TOtCT3Undmcj/3cyYU8mLv1AU6l0Kb3TMTQ5tvkAcoIBJqJySBFyukCjX3nT12yNfEG fihNe6Fw0qQsXzh8SDawDlGHiRyom38b6MElEFaUxQhVfLJpMbhaJ3UMl+HDULFo9vZk ro3w== X-Gm-Message-State: AMCzsaUXKm8FGuMELwE+7X7N4u2iYSkW619krhHyaVrFNRLufbzXiwfP t5qgnK7eC2Lwjibpu2e8Ghvh2g0x2u8RRXhh2pY= X-Google-Smtp-Source: ABhQp+QoCOG35oAEvcffRFOBWTwC4iT5vhHa+/SRk3+Ct7ff5cJzR1RxujhL0CaW/2+nsfbpkdFWZQlwr8XbaHW030A= X-Received: by 10.107.187.198 with SMTP id l189mr4176627iof.135.1507834622212; Thu, 12 Oct 2017 11:57:02 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.36.200 with HTTP; Thu, 12 Oct 2017 11:57:01 -0700 (PDT) In-Reply-To: References: <53C1FDC4-7ACA-4A26-B9AF-72F273B9880B@data-artisans.com> From: Isuru Suriarachchi Date: Thu, 12 Oct 2017 14:57:01 -0400 Message-ID: Subject: Re: Writing to an HDFS file from a Flink stream job To: Piotr Nowojski Cc: Aljoscha Krettek , user Content-Type: multipart/alternative; boundary="94eb2c05d4740985e2055b5e1b52" archived-at: Thu, 12 Oct 2017 18:57:07 -0000 --94eb2c05d4740985e2055b5e1b52 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Thanks for all your directions. BucketingSink worked. Isuru On Thu, Oct 12, 2017 at 9:05 AM, Piotr Nowojski wrote: > I think the issue might be that writeAsText (TextOutputFormat) doesn=E2= =80=99t > flush the data anywhere (only on close, which in streaming doesn=E2=80=99= t happen). > You would need to use custom output format, but as Aljoscha pointed out > BucketingSink makes more sense for streaming applications. > > Piotrek > > On 12 Oct 2017, at 14:58, Aljoscha Krettek wrote: > > Hi Isuru, > > What is the source in your job and is the job terminating at some point o= r > running continuously? > > In general, the writeAsText()/writeAsCsv() methods should not be used > because they don't work well in an infinite streaming job that might have > failures and recovery. I.e. what does that mean for the file, if you have > recovery. For writing to files you would use the BucketingSink: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/ > connectors/filesystem_sink.html > > Best, > Aljoscha > > On 12. Oct 2017, at 14:55, Piotr Nowojski wrote= : > > Hi, > > Maybe this is an access rights issue? Could you try to create and write t= o > same file (same directory) in some other way (manually?), using the same > user and the same machine as would Flink job do? > > Maybe there will be some hint in hdfs logs? > > Piotrek > > On 12 Oct 2017, at 00:19, Isuru Suriarachchi wrote: > > Hi all, > > I'm just trying to use an HDFS file as the sink for my flink stream job. = I > use the following line to do so. > > stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo"); > > > I have not set "fs.hdfs.hadoopconf" in my flink configuration as it shoul= d > work with the full hdfs file name according to [1]. > > However, it doesn't work as expected. File foo is created on hdfs. But > that file is empty. But I don't see any error logs too on Flink side. Whe= n > I used a normal file sink using a "file:///.." url, it works fine and > data is there in the file. > > Do I need any other configuration to get his working? > > Thanks, > Isuru > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/setup/config.html#hdfs > > > > > --94eb2c05d4740985e2055b5e1b52 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks for all your directions.=C2=A0BucketingSink worked.

Isuru

On Thu, Oc= t 12, 2017 at 9:05 AM, Piotr Nowojski <piotr@data-artisans.com&g= t; wrote:
I think the issue might be that writeAsText (TextOutputFor= mat) doesn=E2=80=99t flush the data anywhere (only on close, which in strea= ming doesn=E2=80=99t happen). You would need to use custom output format, b= ut as Aljoscha pointed out BucketingSink makes more sense for streaming app= lications.

Piotrek
On 12 Oct 2017, at 14:58, Aljoscha Kre= ttek <aljoscha@= apache.org> wrote:

Hi Isuru,

What is the source in your job and= is the job terminating at some point or running continuously?
In general, the writeAsText()/writeAsCsv() methods should not = be used because they don't work well in an infinite streaming job that = might have failures and recovery. I.e. what does that mean for the file, if= you have recovery. For writing to files you would use the BucketingSink:= =C2=A0https://ci.ap= ache.org/projects/flink/flink-docs-release-1.4/dev/connectors/fil= esystem_sink.html

Best,
Aljosch= a

On 12. Oct 2017, = at 14:55, Piotr Nowojski <piotr@data-artisans.com> wrote:

Hi,

Maybe this is an access rights= issue? Could you try to create and write to same file (same directory) in = some other way (manually?), using the same user and the same machine as wou= ld Flink job do?

Maybe there will be some hint in = hdfs logs?

Piotrek

On 12 Oct 2017, at 00:19, Isuru Suriarachchi <isurues@gmail.com> wrote= :

Hi all,

I'm just trying to use an HDF= S file as the sink for my flink stream job. I use the following line to do = so.

stream.writeAsText(&qu=
ot;hdfs://hadoop-master:9000/user/isuru/foo");=

I have not set "fs.hdfs.hadoopconf&quo= t; in my flink configuration as it should work with the full hdfs file name= according to [1].=C2=A0

However, it doesn't w= ork as expected. File foo is created on hdfs. But that file is empty. But I= don't see any error logs too on Flink side. When I used a normal file = sink using a "file:///.." url, it works fine and data is t= here in the file.

Do I need any other configuratio= n to get his working?

Thanks,




--94eb2c05d4740985e2055b5e1b52--