Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BDF9D200C6C for ; Fri, 5 May 2017 15:23:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BC7EB160BAF; Fri, 5 May 2017 13:23:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DB658160B97 for ; Fri, 5 May 2017 15:23:18 +0200 (CEST) Received: (qmail 72605 invoked by uid 500); 5 May 2017 13:23:18 -0000 Mailing-List: contact user-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.apache.org Delivered-To: mailing list user@beam.apache.org Received: (qmail 72595 invoked by uid 99); 5 May 2017 13:23:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 May 2017 13:23:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9C898C03A8 for ; Fri, 5 May 2017 13:23:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.496 X-Spam-Level: X-Spam-Status: No, score=-2.496 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-2.796] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id OOgf1tjDUEs3 for ; Fri, 5 May 2017 13:23:15 +0000 (UTC) Received: from relay7-d.mail.gandi.net (relay7-d.mail.gandi.net [217.70.183.200]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id F15C45F567 for ; Fri, 5 May 2017 13:23:14 +0000 (UTC) Received: from relay3-d.mail.gandi.net (relay3-d.mail.gandi.net [217.70.183.195]) by relay7-d.mail.gandi.net (Postfix) with ESMTPS id 792753825 for ; Fri, 5 May 2017 15:23:11 +0200 (CEST) Received: from mfilter10-d.gandi.net (mfilter10-d.gandi.net [217.70.178.139]) by relay3-d.mail.gandi.net (Postfix) with ESMTP id 0285BA80DC for ; Fri, 5 May 2017 15:23:11 +0200 (CEST) X-Virus-Scanned: Debian amavisd-new at mfilter10-d.gandi.net Received: from relay3-d.mail.gandi.net ([IPv6:::ffff:217.70.183.195]) by mfilter10-d.gandi.net (mfilter10-d.gandi.net [::ffff:10.0.15.180]) (amavisd-new, port 10024) with ESMTP id A3dLdqMWEOn7 for ; Fri, 5 May 2017 15:23:09 +0200 (CEST) X-Originating-IP: 82.238.224.4 Received: from [192.168.134.109] (bre91-1-82-238-224-4.fbx.proxad.net [82.238.224.4]) (Authenticated sender: jb@nanthrax.net) by relay3-d.mail.gandi.net (Postfix) with ESMTPSA id 1113EA8138 for ; Fri, 5 May 2017 15:23:05 +0200 (CEST) Subject: Re: [HEADS UP] Using "new" filesystem layer To: user@beam.apache.org References: <98473347-3da5-d63f-4044-660ae78fab3a@nanthrax.net> From: =?UTF-8?Q?Jean-Baptiste_Onofr=c3=a9?= Message-ID: Date: Fri, 5 May 2017 15:23:05 +0200 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:45.0) Gecko/20100101 Thunderbird/45.8.0 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit archived-at: Fri, 05 May 2017 13:23:19 -0000 Hi guys, thanks Luke, I updated my pipeline like this: HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class); HadoopFileSystemOptions.ConfigurationLocator locator = new HadoopFileSystemOptions.ConfigurationLocator(); List configurations = locator.create(options); Pipeline pipeline = Pipeline.create(options); ... pipeline.apply(TextIO.write().to("hdfs://localhost/path")); I defined HADOOP_CONF_DIR env variable pointing to the folder where I have hdfs-site.xml and it works fine. I saw that the README.md is not up to date in hadoop-file-system, I'm preparing a PR about that and I also preparing a quick documentation about HDFS support. Regards JB On 05/04/2017 06:07 PM, Lukasz Cwik wrote: > JB, for your second point it seems as though you may not be setting the Hadoop > configuration on HadoopFileSystemOptions. > Also, I just merged https://github.com/apache/beam/pull/2890 which will auto > detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR > environment variables. > > On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré > wrote: > > Hi guys, > > One of key refactoring/new feature we bring in the first stable release is > the "new" Beam filesystems. > > I started to play with it on couple of use cases I have in beam-samples. > > 1/ TextIO.write() with unbounded PCollection (stream) > > The first use case is the TextIO write with unbounded PCollection (good > timing as we had a question yesterday about this on Slack). > > I confirm that TextIO now supports unbounded PCollection. You have to create > a Window and "flag" TextIO to use windowing. > > Here's the code snippet: > > pipeline > > .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM")) > .apply(MapElements.via(new SimpleFunction() { > public String apply(JmsRecord input) { > return input.getPayload(); > } > })) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) > .apply(TextIO.write() > .to("/home/jbonofre/demo/beam/output/uc2") > .withWindowedWrites() > .withNumShards(3)); > > Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the > JMS ack to advance the watermark, it should not be auto but client ack). I'm > preparing a PR for JmsIO about this. > However the "windowed" TextIO works fine. > > 2/ Beam HDFS filesystem > > The other use case is to use the "new" Beam filesystem with TextIO, > especially HDFS. > > So, in my pipeline, I'm using: > > > .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1")); > > In my pom.xml, I define both Beam hadoop-file-system and hadoop-client > dependencies: > > > org.apache.beam > beam-sdks-java-io-hadoop-file-system > 0.7.0-SNAPSHOT > > > org.apache.hadoop > hadoop-client > 2.7.3 > > > Unfortunately, when starting the pipeline, I have: > > Exception in thread "main" java.lang.IllegalStateException: Unable to find > registrar for hdfs > at org.apache.beam.sdk.io > .FileSystems.getFileSystemInternal(FileSystems.java:427) > at org.apache.beam.sdk.io > .FileSystems.matchNewResource(FileSystems.java:494) > at org.apache.beam.sdk.io > .FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193) > at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292) > at > org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39) > > I gonna investigate tonight and I will let you know. > > Regards > JB > -- > Jean-Baptiste Onofré > jbonofre@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com > > -- Jean-Baptiste Onofré jbonofre@apache.org http://blog.nanthrax.net Talend - http://www.talend.com