beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré ...@nanthrax.net>
Subject Re: [HEADS UP] Using "new" filesystem layer
Date Fri, 05 May 2017 13:23:05 GMT
Hi guys,

thanks Luke, I updated my pipeline like this:

         HadoopFileSystemOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
         HadoopFileSystemOptions.ConfigurationLocator locator = new 
HadoopFileSystemOptions.ConfigurationLocator();
         List<Configuration> configurations = locator.create(options);
	Pipeline pipeline = Pipeline.create(options);
	...
	pipeline.apply(TextIO.write().to("hdfs://localhost/path"));

I defined HADOOP_CONF_DIR env variable pointing to the folder where I have 
hdfs-site.xml and it works fine.

I saw that the README.md is not up to date in hadoop-file-system, I'm preparing 
a PR about that and I also preparing a quick documentation about HDFS support.

Regards
JB

On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
> JB, for your second point it seems as though you may not be setting the Hadoop
> configuration on HadoopFileSystemOptions.
> Also, I just merged https://github.com/apache/beam/pull/2890 which will auto
> detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR
> environment variables.
>
> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> <mailto:jb@nanthrax.net>> wrote:
>
>     Hi guys,
>
>     One of key refactoring/new feature we bring in the first stable release is
>     the "new" Beam filesystems.
>
>     I started to play with it on couple of use cases I have in beam-samples.
>
>     1/ TextIO.write() with unbounded PCollection (stream)
>
>     The first use case is the TextIO write with unbounded PCollection (good
>     timing as we had a question yesterday about this on Slack).
>
>     I confirm that TextIO now supports unbounded PCollection. You have to create
>     a Window and "flag" TextIO to use windowing.
>
>     Here's the code snippet:
>
>     pipeline
>
>     .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
>                     .apply(MapElements.via(new SimpleFunction<JmsRecord, String>()
{
>                         public String apply(JmsRecord input) {
>                             return input.getPayload();
>                         }
>                     }))
>
>     .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
>                     .apply(TextIO.write()
>                             .to("/home/jbonofre/demo/beam/output/uc2")
>                             .withWindowedWrites()
>                             .withNumShards(3));
>
>     Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
>     JMS ack to advance the watermark, it should not be auto but client ack). I'm
>     preparing a PR for JmsIO about this.
>     However the "windowed" TextIO works fine.
>
>     2/ Beam HDFS filesystem
>
>     The other use case is to use the "new" Beam filesystem with TextIO,
>     especially HDFS.
>
>     So, in my pipeline, I'm using:
>
>
>     .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));
>
>     In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
>     dependencies:
>
>             <dependency>
>                 <groupId>org.apache.beam</groupId>
>                 <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
>                 <version>0.7.0-SNAPSHOT</version>
>             </dependency>
>             <dependency>
>                 <groupId>org.apache.hadoop</groupId>
>                 <artifactId>hadoop-client</artifactId>
>                 <version>2.7.3</version>
>             </dependency>
>
>     Unfortunately, when starting the pipeline, I have:
>
>     Exception in thread "main" java.lang.IllegalStateException: Unable to find
>     registrar for hdfs
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
>             at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
>             at
>     org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)
>
>     I gonna investigate tonight and I will let you know.
>
>     Regards
>     JB
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org <mailto:jbonofre@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Mime
View raw message