Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 53655200C32 for ; Thu, 23 Feb 2017 01:20:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 51D0C160B72; Thu, 23 Feb 2017 00:20:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1FF67160B62 for ; Thu, 23 Feb 2017 01:20:24 +0100 (CET) Received: (qmail 28095 invoked by uid 500); 23 Feb 2017 00:20:24 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 28086 invoked by uid 99); 23 Feb 2017 00:20:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Feb 2017 00:20:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 200EFDFDD1; Thu, 23 Feb 2017 00:20:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Date: Thu, 23 Feb 2017 00:20:24 -0000 Message-Id: <6f10f5323a224d7893d06a27494602fc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] beam git commit: Refactor Hadoop/HDFS IO archived-at: Thu, 23 Feb 2017 00:20:27 -0000 Repository: beam Updated Branches: refs/heads/master f303b9899 -> cad84c880 Refactor Hadoop/HDFS IO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da6d7e16 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da6d7e16 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da6d7e16 Branch: refs/heads/master Commit: da6d7e16464cf8ddb1a798872e79f3ff55580c9c Parents: 2b1c084 Author: Rafal Wojdyla Authored: Thu Feb 16 00:56:51 2017 -0500 Committer: Dan Halperin Committed: Wed Feb 22 16:20:14 2017 -0800 ---------------------------------------------------------------------- sdks/java/io/hdfs/pom.xml | 11 + .../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 300 ++++++++--- .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 531 ++++++++++++------- .../sdk/io/hdfs/SerializableConfiguration.java | 28 +- .../org/apache/beam/sdk/io/hdfs/UGIHelper.java | 38 ++ .../apache/beam/sdk/io/hdfs/WritableCoder.java | 2 +- .../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 173 ++++++ .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 39 +- 8 files changed, 839 insertions(+), 283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index cd6cf4c..f857a22 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -105,6 +105,17 @@ + com.google.auto.value + auto-value + provided + + + + org.slf4j + slf4j-api + + + com.google.code.findbugs jsr305 http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java index 6d30307..168bac7 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java @@ -17,25 +17,36 @@ */ package org.apache.beam.sdk.io.hdfs; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.io.IOException; -import java.util.Map; +import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.Random; import java.util.Set; +import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -46,67 +57,203 @@ import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; /** - * A {@code Sink} for writing records to a Hadoop filesystem using a Hadoop file-based output + * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based output * format. * - * @param The type of keys to be written to the sink. - * @param The type of values to be written to the sink. + *

To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop + * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported + * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the + * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K + * and V. + * + *

{@code HDFSFileSink} can be used by {@link org.apache.beam.sdk.io.Write} to create write + * transform. See example below. + * + *

{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example: + * + *

+ * {@code
+ * HDFSFileSink, NullWritable> sink =
+ *   HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
+ * avroRecordsPCollection.apply(Write.to(sink));
+ * }
+ * 
+ * + * @param the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}. + * @param the type of keys to be written to the sink via {@link FileOutputFormat}. + * @param the type of values to be written to the sink via {@link FileOutputFormat}. */ -public class HDFSFileSink extends Sink> { +@AutoValue +public abstract class HDFSFileSink extends Sink { private static final JobID jobId = new JobID( Long.toString(System.currentTimeMillis()), new Random().nextInt(Integer.MAX_VALUE)); - protected final String path; - protected final Class> formatClass; + public abstract String path(); + public abstract Class> formatClass(); + public abstract Class keyClass(); + public abstract Class valueClass(); + public abstract SerializableFunction> outputConverter(); + public abstract SerializableConfiguration serializableConfiguration(); + public @Nullable abstract String username(); + public abstract boolean validate(); + + // ======================================================================= + // Factory methods + // ======================================================================= + + public static > HDFSFileSink + to(String path, + Class formatClass, + Class keyClass, + Class valueClass, + SerializableFunction> outputConverter) { + return HDFSFileSink.builder() + .setPath(path) + .setFormatClass(formatClass) + .setKeyClass(keyClass) + .setValueClass(valueClass) + .setOutputConverter(outputConverter) + .setConfiguration(null) + .setUsername(null) + .setValidate(true) + .build(); + } + + public static HDFSFileSink toText(String path) { + SerializableFunction> outputConverter = + new SerializableFunction>() { + @Override + public KV apply(T input) { + return KV.of(NullWritable.get(), new Text(input.toString())); + } + }; + return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter); + } + + /** + * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration + * object is altered to enable Avro output. + */ + public static HDFSFileSink, NullWritable> toAvro(String path, + final AvroCoder coder, + Configuration conf) { + SerializableFunction, NullWritable>> outputConverter = + new SerializableFunction, NullWritable>>() { + @Override + public KV, NullWritable> apply(T input) { + return KV.of(new AvroKey<>(input), NullWritable.get()); + } + }; + conf.set("avro.schema.output.key", coder.getSchema().toString()); + return to( + path, + AvroKeyOutputFormat.class, + (Class>) (Class) AvroKey.class, + NullWritable.class, + outputConverter).withConfiguration(conf); + } + + /** + * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration + * object is altered to enable Avro output. + */ + public static HDFSFileSink, NullWritable> + toAvro(String path, Schema schema, Configuration conf) { + return toAvro(path, AvroCoder.of(schema), conf); + } - // workaround to make Configuration serializable - private final Map map; + /** + * Helper to create Avro sink given {@link Class}. Keep in mind that configuration + * object is altered to enable Avro output. + */ + public static HDFSFileSink, NullWritable> toAvro(String path, + Class cls, + Configuration conf) { + return toAvro(path, AvroCoder.of(cls), conf); + } - public HDFSFileSink(String path, Class> formatClass) { - this.path = path; - this.formatClass = formatClass; - this.map = Maps.newHashMap(); + // ======================================================================= + // Builder methods + // ======================================================================= + + public abstract Builder toBuilder(); + public static Builder builder() { + return new AutoValue_HDFSFileSink.Builder<>(); } - public HDFSFileSink(String path, Class> formatClass, - Configuration conf) { - this(path, formatClass); - // serialize conf to map - for (Map.Entry entry : conf) { - map.put(entry.getKey(), entry.getValue()); + /** + * AutoValue builder for {@link HDFSFileSink}. + */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setPath(String path); + public abstract Builder setFormatClass( + Class> formatClass); + public abstract Builder setKeyClass(Class keyClass); + public abstract Builder setValueClass(Class valueClass); + public abstract Builder setOutputConverter( + SerializableFunction> outputConverter); + public abstract Builder setSerializableConfiguration( + SerializableConfiguration serializableConfiguration); + public Builder setConfiguration(@Nullable Configuration configuration) { + if (configuration == null) { + configuration = new Configuration(false); + } + return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); } + public abstract Builder setUsername(String username); + public abstract Builder setValidate(boolean validate); + public abstract HDFSFileSink build(); } + public HDFSFileSink withConfiguration(@Nullable Configuration configuration) { + return this.toBuilder().setConfiguration(configuration).build(); + } + + public HDFSFileSink withUsername(@Nullable String username) { + return this.toBuilder().setUsername(username).build(); + } + + // ======================================================================= + // Sink + // ======================================================================= + @Override public void validate(PipelineOptions options) { - try { - Job job = jobInstance(); - FileSystem fs = FileSystem.get(job.getConfiguration()); - checkState(!fs.exists(new Path(path)), "Output path " + path + " already exists"); - } catch (IOException e) { - throw new RuntimeException(e); + if (validate()) { + try { + UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + FileSystem fs = FileSystem.get(new URI(path()), + SerializableConfiguration.newConfiguration(serializableConfiguration())); + checkState(!fs.exists(new Path(path())), "Output path %s already exists", path()); + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } } } @Override - public Sink.WriteOperation, ?> createWriteOperation(PipelineOptions options) { - return new HDFSWriteOperation<>(this, path, formatClass); + public Sink.WriteOperation createWriteOperation(PipelineOptions options) { + return new HDFSWriteOperation<>(this, path(), formatClass()); } - private Job jobInstance() throws IOException { - Job job = Job.getInstance(); - // deserialize map to conf - Configuration conf = job.getConfiguration(); - for (Map.Entry entry : map.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } + private Job newJob() throws IOException { + Job job = SerializableConfiguration.newJob(serializableConfiguration()); job.setJobID(jobId); + job.setOutputKeyClass(keyClass()); + job.setOutputValueClass(valueClass()); return job; } @@ -115,15 +262,15 @@ public class HDFSFileSink extends Sink> { // ======================================================================= /** {{@link WriteOperation}} for HDFS. */ - public static class HDFSWriteOperation extends WriteOperation, String> { + private static class HDFSWriteOperation extends WriteOperation { - private final Sink> sink; - protected final String path; - protected final Class> formatClass; + private final HDFSFileSink sink; + private final String path; + private final Class> formatClass; - public HDFSWriteOperation(Sink> sink, - String path, - Class> formatClass) { + HDFSWriteOperation(HDFSFileSink sink, + String path, + Class> formatClass) { this.sink = sink; this.path = path; this.formatClass = formatClass; @@ -131,14 +278,25 @@ public class HDFSFileSink extends Sink> { @Override public void initialize(PipelineOptions options) throws Exception { - Job job = ((HDFSFileSink) getSink()).jobInstance(); + Job job = sink.newJob(); FileOutputFormat.setOutputPath(job, new Path(path)); } @Override - public void finalize(Iterable writerResults, PipelineOptions options) throws Exception { - Job job = ((HDFSFileSink) getSink()).jobInstance(); - FileSystem fs = FileSystem.get(job.getConfiguration()); + public void finalize(final Iterable writerResults, PipelineOptions options) + throws Exception { + UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doFinalize(writerResults); + return null; + } + }); + } + + private void doFinalize(Iterable writerResults) throws Exception { + Job job = sink.newJob(); + FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration()); // If there are 0 output shards, just create output folder. if (!writerResults.iterator().hasNext()) { @@ -188,12 +346,12 @@ public class HDFSFileSink extends Sink> { } @Override - public Writer, String> createWriter(PipelineOptions options) throws Exception { + public Writer createWriter(PipelineOptions options) throws Exception { return new HDFSWriter<>(this, path, formatClass); } @Override - public Sink> getSink() { + public Sink getSink() { return sink; } @@ -208,10 +366,9 @@ public class HDFSFileSink extends Sink> { // Writer // ======================================================================= - /** {{@link Writer}} for HDFS files. */ - public static class HDFSWriter extends Writer, String> { + private static class HDFSWriter extends Writer { - private final HDFSWriteOperation writeOperation; + private final HDFSWriteOperation writeOperation; private final String path; private final Class> formatClass; @@ -222,19 +379,31 @@ public class HDFSFileSink extends Sink> { private RecordWriter recordWriter; private FileOutputCommitter outputCommitter; - public HDFSWriter(HDFSWriteOperation writeOperation, - String path, - Class> formatClass) { + HDFSWriter(HDFSWriteOperation writeOperation, + String path, + Class> formatClass) { this.writeOperation = writeOperation; this.path = path; this.formatClass = formatClass; } @Override - public void open(String uId) throws Exception { + public void open(final String uId) throws Exception { + UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doOpen(uId); + return null; + } + } + ); + } + + private void doOpen(String uId) throws Exception { this.hash = uId.hashCode(); - Job job = ((HDFSFileSink) getWriteOperation().getSink()).jobInstance(); + Job job = writeOperation.sink.newJob(); FileOutputFormat.setOutputPath(job, new Path(path)); // Each Writer is responsible for writing one bundle of elements and is represented by one @@ -250,12 +419,25 @@ public class HDFSFileSink extends Sink> { } @Override - public void write(KV value) throws Exception { - recordWriter.write(value.getKey(), value.getValue()); + public void write(T value) throws Exception { + checkNotNull(recordWriter, + "Record writer can't be null. Make sure to open Writer first!"); + KV kv = writeOperation.sink.outputConverter().apply(value); + recordWriter.write(kv.getKey(), kv.getValue()); } @Override public String close() throws Exception { + return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( + new PrivilegedExceptionAction() { + @Override + public String run() throws Exception { + return doClose(); + } + }); + } + + private String doClose() throws Exception { // task/attempt successful recordWriter.close(context); outputCommitter.commitTask(context); @@ -265,7 +447,7 @@ public class HDFSFileSink extends Sink> { } @Override - public WriteOperation, String> getWriteOperation() { + public WriteOperation getWriteOperation() { return writeOperation; } http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index 1affb4a..8e12561 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -18,8 +18,9 @@ package org.apache.beam.sdk.io.hdfs; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -29,20 +30,34 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroKeyInputFormat; +import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; @@ -52,10 +67,13 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * A {@code BoundedSource} for reading files resident in a Hadoop filesystem (HDFS) using a + * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a * Hadoop file-based input format. * *

To read a {@link org.apache.beam.sdk.values.PCollection} of @@ -75,153 +93,301 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; * } * * - *

The {@link HDFSFileSource#readFrom} method is a convenience method - * that returns a read transform. For example: - * - *

- * {@code
- * PCollection> records = HDFSFileSource.readFrom(path,
- *   MyInputFormat.class, MyKey.class, MyValue.class);
- * }
- * 
- * *

Implementation note: Since Hadoop's * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat} * determines the input splits, this class extends {@link BoundedSource} rather than * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter * dictates input splits. - - * @param The type of keys to be read from the source. - * @param The type of values to be read from the source. + * @param the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}. + * @param the type of keys to be read from the source via {@link FileInputFormat}. + * @param the type of values to be read from the source via {@link FileInputFormat}. */ -public class HDFSFileSource extends BoundedSource> { +@AutoValue +public abstract class HDFSFileSource extends BoundedSource { private static final long serialVersionUID = 0L; - protected final String filepattern; - protected final Class> formatClass; - protected final Class keyClass; - protected final Class valueClass; - protected final SerializableSplit serializableSplit; + private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class); + + public abstract String filepattern(); + public abstract Class> formatClass(); + public abstract Coder coder(); + public abstract SerializableFunction, T> inputConverter(); + public abstract SerializableConfiguration serializableConfiguration(); + public @Nullable abstract SerializableSplit serializableSplit(); + public @Nullable abstract String username(); + public abstract boolean validateSource(); + + // ======================================================================= + // Factory methods + // ======================================================================= + + public static > HDFSFileSource + from(String filepattern, + Class formatClass, + Coder coder, + SerializableFunction, T> inputConverter) { + return HDFSFileSource.builder() + .setFilepattern(filepattern) + .setFormatClass(formatClass) + .setCoder(coder) + .setInputConverter(inputConverter) + .setConfiguration(null) + .setUsername(null) + .setValidateSource(true) + .setSerializableSplit(null) + .build(); + } - /** - * Creates a {@code Read} transform that will read from an {@code HDFSFileSource} - * with the given file name or pattern ("glob") using the given Hadoop - * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, - * with key-value types specified by the given key class and value class. - */ - public static > Read.Bounded> readFrom( - String filepattern, Class formatClass, Class keyClass, Class valueClass) { - return Read.from(from(filepattern, formatClass, keyClass, valueClass)); + public static > HDFSFileSource, K, V> + from(String filepattern, + Class formatClass, + Class keyClass, + Class valueClass) { + KvCoder coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass)); + SerializableFunction, KV> inputConverter = + new SerializableFunction, KV>() { + @Override + public KV apply(KV input) { + return input; + } + }; + return HDFSFileSource., K, V>builder() + .setFilepattern(filepattern) + .setFormatClass(formatClass) + .setCoder(coder) + .setInputConverter(inputConverter) + .setConfiguration(null) + .setUsername(null) + .setValidateSource(true) + .setSerializableSplit(null) + .build(); + } + + public static HDFSFileSource + fromText(String filepattern) { + SerializableFunction, String> inputConverter = + new SerializableFunction, String>() { + @Override + public String apply(KV input) { + return input.getValue().toString(); + } + }; + return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter); } /** - * Creates a {@code HDFSFileSource} that reads from the given file name or pattern ("glob") - * using the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, - * with key-value types specified by the given key class and value class. + * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration + * object is altered to enable Avro input. */ - public static > HDFSFileSource from( - String filepattern, Class formatClass, Class keyClass, Class valueClass) { - return new HDFSFileSource<>(filepattern, formatClass, keyClass, valueClass); + public static HDFSFileSource, NullWritable> + fromAvro(String filepattern, final AvroCoder coder, Configuration conf) { + Class> formatClass = castClass(AvroKeyInputFormat.class); + SerializableFunction, NullWritable>, T> inputConverter = + new SerializableFunction, NullWritable>, T>() { + @Override + public T apply(KV, NullWritable> input) { + try { + return CoderUtils.clone(coder, input.getKey().datum()); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + }; + conf.set("avro.schema.input.key", coder.getSchema().toString()); + return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf); } /** - * Create a {@code HDFSFileSource} based on a file or a file pattern specification. + * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration + * object is altered to enable Avro input. */ - protected HDFSFileSource(String filepattern, - Class> formatClass, Class keyClass, - Class valueClass) { - this(filepattern, formatClass, keyClass, valueClass, null); + public static HDFSFileSource, NullWritable> + fromAvro(String filepattern, Schema schema, Configuration conf) { + return fromAvro(filepattern, AvroCoder.of(schema), conf); } /** - * Create a {@code HDFSFileSource} based on a single Hadoop input split, which won't be - * split up further. + * Helper to read from Avro source given {@link Class}. Keep in mind that configuration + * object is altered to enable Avro input. */ - protected HDFSFileSource(String filepattern, - Class> formatClass, Class keyClass, - Class valueClass, SerializableSplit serializableSplit) { - this.filepattern = filepattern; - this.formatClass = formatClass; - this.keyClass = keyClass; - this.valueClass = valueClass; - this.serializableSplit = serializableSplit; + public static HDFSFileSource, NullWritable> + fromAvro(String filepattern, Class cls, Configuration conf) { + return fromAvro(filepattern, AvroCoder.of(cls), conf); } - public String getFilepattern() { - return filepattern; + // ======================================================================= + // Builder methods + // ======================================================================= + + public abstract HDFSFileSource.Builder toBuilder(); + public static HDFSFileSource.Builder builder() { + return new AutoValue_HDFSFileSource.Builder<>(); } - public Class> getFormatClass() { - return formatClass; + /** + * AutoValue builder for {@link HDFSFileSource}. + */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setFilepattern(String filepattern); + public abstract Builder setFormatClass( + Class> formatClass); + public abstract Builder setCoder(Coder coder); + public abstract Builder setInputConverter( + SerializableFunction, T> inputConverter); + public abstract Builder setSerializableConfiguration( + SerializableConfiguration serializableConfiguration); + public Builder setConfiguration(Configuration configuration) { + if (configuration == null) { + configuration = new Configuration(false); + } + return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); + } + public abstract Builder setSerializableSplit(SerializableSplit serializableSplit); + public abstract Builder setUsername(@Nullable String username); + public abstract Builder setValidateSource(boolean validate); + public abstract HDFSFileSource build(); } - public Class getKeyClass() { - return keyClass; + public HDFSFileSource withConfiguration(@Nullable Configuration configuration) { + return this.toBuilder().setConfiguration(configuration).build(); } - public Class getValueClass() { - return valueClass; + public HDFSFileSource withUsername(@Nullable String username) { + return this.toBuilder().setUsername(username).build(); } + // ======================================================================= + // BoundedSource + // ======================================================================= + @Override - public void validate() { - checkNotNull(filepattern, "need to set the filepattern of a HDFSFileSource"); - checkNotNull(formatClass, "need to set the format class of a HDFSFileSource"); - checkNotNull(keyClass, "need to set the key class of a HDFSFileSource"); - checkNotNull(valueClass, "need to set the value class of a HDFSFileSource"); + public List> splitIntoBundles( + final long desiredBundleSizeBytes, + PipelineOptions options) throws Exception { + if (serializableSplit() == null) { + List inputSplits = UGIHelper.getBestUGI(username()).doAs( + new PrivilegedExceptionAction>() { + @Override + public List run() throws Exception { + return computeSplits(desiredBundleSizeBytes, serializableConfiguration()); + } + }); + return Lists.transform(inputSplits, + new Function>() { + @Override + public BoundedSource apply(@Nullable InputSplit inputSplit) { + SerializableSplit serializableSplit = new SerializableSplit(inputSplit); + return HDFSFileSource.this.toBuilder() + .setSerializableSplit(serializableSplit) + .build(); + } + }); + } else { + return ImmutableList.of(this); + } } @Override - public List>> splitIntoBundles(long desiredBundleSizeBytes, - PipelineOptions options) throws Exception { - if (serializableSplit == null) { - return Lists.transform(computeSplits(desiredBundleSizeBytes), - new Function>>() { + public long getEstimatedSizeBytes(PipelineOptions options) { + long size = 0; + + try { + // If this source represents a split from splitIntoBundles, then return the size of the split, + // rather then the entire input + if (serializableSplit() != null) { + return serializableSplit().getSplit().getLength(); + } + + size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction() { @Override - public BoundedSource> apply(@Nullable InputSplit inputSplit) { - return new HDFSFileSource<>(filepattern, formatClass, keyClass, - valueClass, new SerializableSplit(inputSplit)); + public Long run() throws Exception { + long size = 0; + Job job = SerializableConfiguration.newJob(serializableConfiguration()); + for (FileStatus st : listStatus(createFormat(job), job)) { + size += st.getLen(); + } + return size; } }); - } else { - return ImmutableList.of(this); + } catch (IOException e) { + LOG.warn( + "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e); + // ignore, and return 0 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn( + "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e); + // ignore, and return 0 } + return size; } - private FileInputFormat createFormat(Job job) throws IOException, IllegalAccessException, - InstantiationException { - Path path = new Path(filepattern); - FileInputFormat.addInputPath(job, path); - return formatClass.newInstance(); + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + this.validate(); + return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit()); + } + + @Override + public void validate() { + if (validateSource()) { + try { + UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + FileSystem fs = FileSystem.get(new URI(filepattern()), + SerializableConfiguration.newConfiguration(serializableConfiguration())); + FileStatus[] fileStatuses = fs.globStatus(new Path(filepattern())); + checkState( + fileStatuses != null && fileStatuses.length > 0, + "Unable to find any files matching %s", filepattern()); + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } } - protected List computeSplits(long desiredBundleSizeBytes) throws IOException, - IllegalAccessException, InstantiationException { - Job job = Job.getInstance(); + @Override + public Coder getDefaultOutputCoder() { + return coder(); + } + + // ======================================================================= + // Helpers + // ======================================================================= + + private List computeSplits(long desiredBundleSizeBytes, + SerializableConfiguration serializableConfiguration) + throws IOException, IllegalAccessException, InstantiationException { + Job job = SerializableConfiguration.newJob(serializableConfiguration); FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes); FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes); return createFormat(job).getSplits(job); } - @Override - public BoundedReader> createReader(PipelineOptions options) throws IOException { - this.validate(); - - if (serializableSplit == null) { - return new HDFSFileReader<>(this, filepattern, formatClass); - } else { - return new HDFSFileReader<>(this, filepattern, formatClass, - serializableSplit.getSplit()); - } + private FileInputFormat createFormat(Job job) + throws IOException, IllegalAccessException, InstantiationException { + Path path = new Path(filepattern()); + FileInputFormat.addInputPath(job, path); + return formatClass().newInstance(); } - @Override - public Coder> getDefaultOutputCoder() { - return KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass)); + private List listStatus(FileInputFormat format, Job job) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + // FileInputFormat#listStatus is protected, so call using reflection + Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class); + listStatus.setAccessible(true); + @SuppressWarnings("unchecked") + List stat = (List) listStatus.invoke(format, job); + return stat; } @SuppressWarnings("unchecked") - private Coder getDefaultCoder(Class c) { + private static Coder getDefaultCoder(Class c) { if (Writable.class.isAssignableFrom(c)) { Class writableClass = (Class) c; return (Coder) WritableCoder.of(writableClass); @@ -232,82 +398,46 @@ public class HDFSFileSource extends BoundedSource> { throw new IllegalStateException("Cannot find coder for " + c); } - // BoundedSource - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) { - long size = 0; - - try { - // If this source represents a split from splitIntoBundles, then return the size of the split, - // rather then the entire input - if (serializableSplit != null) { - return serializableSplit.getSplit().getLength(); - } - - Job job = Job.getInstance(); // new instance - for (FileStatus st : listStatus(createFormat(job), job)) { - size += st.getLen(); - } - } catch (IOException | NoSuchMethodException | InvocationTargetException - | IllegalAccessException | InstantiationException e) { - // ignore, and return 0 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // ignore, and return 0 - } - return size; + @SuppressWarnings("unchecked") + private static Class castClass(Class aClass) { + return (Class) aClass; } - private List listStatus(FileInputFormat format, - JobContext jobContext) throws NoSuchMethodException, InvocationTargetException, - IllegalAccessException { - // FileInputFormat#listStatus is protected, so call using reflection - Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class); - listStatus.setAccessible(true); - @SuppressWarnings("unchecked") - List stat = (List) listStatus.invoke(format, jobContext); - return stat; - } + // ======================================================================= + // BoundedReader + // ======================================================================= - static class HDFSFileReader extends BoundedSource.BoundedReader> { + private static class HDFSFileReader extends BoundedSource.BoundedReader { - private final BoundedSource> source; + private final HDFSFileSource source; private final String filepattern; - private final Class> formatClass; - protected Job job; + private final Class> formatClass; + private final Job job; - private FileInputFormat format; - private TaskAttemptContext attemptContext; private List splits; private ListIterator splitsIterator; + private Configuration conf; - protected RecordReader currentReader; + private FileInputFormat format; + private TaskAttemptContext attemptContext; + private RecordReader currentReader; private KV currentPair; - private volatile boolean done = false; - - /** - * Create a {@code HDFSFileReader} based on a file or a file pattern specification. - */ - public HDFSFileReader(BoundedSource> source, String filepattern, - Class> formatClass) throws IOException { - this(source, filepattern, formatClass, null); - } - /** - * Create a {@code HDFSFileReader} based on a single Hadoop input split. - */ - public HDFSFileReader(BoundedSource> source, String filepattern, - Class> formatClass, InputSplit split) - throws IOException { + HDFSFileReader( + HDFSFileSource source, + String filepattern, + Class> formatClass, + SerializableSplit serializableSplit) + throws IOException { this.source = source; this.filepattern = filepattern; this.formatClass = formatClass; - if (split != null) { - this.splits = ImmutableList.of(split); + this.job = SerializableConfiguration.newJob(source.serializableConfiguration()); + + if (serializableSplit != null) { + this.splits = ImmutableList.of(serializableSplit.getSplit()); this.splitsIterator = splits.listIterator(); } - this.job = Job.getInstance(); // new instance } @Override @@ -315,21 +445,19 @@ public class HDFSFileSource extends BoundedSource> { Path path = new Path(filepattern); FileInputFormat.addInputPath(job, path); + conf = job.getConfiguration(); try { - @SuppressWarnings("unchecked") - FileInputFormat f = (FileInputFormat) formatClass.newInstance(); - this.format = f; + format = formatClass.newInstance(); } catch (InstantiationException | IllegalAccessException e) { throw new IOException("Cannot instantiate file input format " + formatClass, e); } - this.attemptContext = new TaskAttemptContextImpl(job.getConfiguration(), - new TaskAttemptID()); + attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID()); if (splitsIterator == null) { - this.splits = format.getSplits(job); - this.splitsIterator = splits.listIterator(); + splits = format.getSplits(job); + splitsIterator = splits.listIterator(); } - this.conf = job.getConfiguration(); + return advance(); } @@ -342,7 +470,7 @@ public class HDFSFileSource extends BoundedSource> { } else { while (splitsIterator.hasNext()) { // advance the reader and see if it has records - InputSplit nextSplit = splitsIterator.next(); + final InputSplit nextSplit = splitsIterator.next(); @SuppressWarnings("unchecked") RecordReader reader = (RecordReader) format.createRecordReader(nextSplit, attemptContext); @@ -350,7 +478,13 @@ public class HDFSFileSource extends BoundedSource> { currentReader.close(); } currentReader = reader; - currentReader.initialize(nextSplit, attemptContext); + UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + currentReader.initialize(nextSplit, attemptContext); + return null; + } + }); if (currentReader.nextKeyValue()) { currentPair = nextPair(); return true; @@ -360,7 +494,6 @@ public class HDFSFileSource extends BoundedSource> { } // either no next split or all readers were empty currentPair = null; - done = true; return false; } } catch (InterruptedException e) { @@ -369,26 +502,12 @@ public class HDFSFileSource extends BoundedSource> { } } - @SuppressWarnings("unchecked") - protected KV nextPair() throws IOException, InterruptedException { - K key = currentReader.getCurrentKey(); - V value = currentReader.getCurrentValue(); - // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue - if (key instanceof Writable) { - key = (K) WritableUtils.clone((Writable) key, conf); - } - if (value instanceof Writable) { - value = (V) WritableUtils.clone((Writable) value, conf); - } - return KV.of(key, value); - } - @Override - public KV getCurrent() throws NoSuchElementException { + public T getCurrent() throws NoSuchElementException { if (currentPair == null) { throw new NoSuchElementException(); } - return currentPair; + return source.inputConverter().apply(currentPair); } @Override @@ -401,11 +520,27 @@ public class HDFSFileSource extends BoundedSource> { } @Override - public BoundedSource> getCurrentSource() { + public BoundedSource getCurrentSource() { return source; } - // BoundedReader + @SuppressWarnings("unchecked") + private KV nextPair() throws IOException, InterruptedException { + K key = currentReader.getCurrentKey(); + V value = currentReader.getCurrentValue(); + // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue + if (key instanceof Writable) { + key = (K) WritableUtils.clone((Writable) key, conf); + } + if (value instanceof Writable) { + value = (V) WritableUtils.clone((Writable) value, conf); + } + return KV.of(key, value); + } + + // ======================================================================= + // Optional overrides + // ======================================================================= @Override public Double getFractionConsumed() { @@ -437,31 +572,18 @@ public class HDFSFileSource extends BoundedSource> { } } - @Override - public final long getSplitPointsRemaining() { - if (done) { - return 0; - } - // This source does not currently support dynamic work rebalancing, so remaining - // parallelism is always 1. - return 1; - } - - @Override - public BoundedSource> splitAtFraction(double fraction) { - // Not yet supported. To implement this, the sizes of the splits should be used to - // calculate the remaining splits that constitute the given fraction, then a - // new source backed by those splits should be returned. - return null; - } } + // ======================================================================= + // SerializableSplit + // ======================================================================= + /** * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be * serialized using Java's standard serialization mechanisms. Note that the InputSplit * has to be Writable (which most are). */ - public static class SerializableSplit implements Externalizable { + protected static class SerializableSplit implements Externalizable { private static final long serialVersionUID = 0L; private InputSplit split; @@ -496,5 +618,4 @@ public class HDFSFileSource extends BoundedSource> { } } - } http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java index f7b4bff..0772e57 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java @@ -57,11 +57,37 @@ public class SerializableConfiguration implements Externalizable { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - conf = new Configuration(); + conf = new Configuration(false); int size = in.readInt(); for (int i = 0; i < size; i++) { conf.set(in.readUTF(), in.readUTF()); } } + /** + * Returns new configured {@link Job} object. + */ + public static Job newJob(@Nullable SerializableConfiguration conf) throws IOException { + if (conf == null) { + return Job.getInstance(); + } else { + Job job = Job.getInstance(); + for (Map.Entry entry : conf.get()) { + job.getConfiguration().set(entry.getKey(), entry.getValue()); + } + return job; + } + } + + /** + * Returns new populated {@link Configuration} object. + */ + public static Configuration newConfiguration(@Nullable SerializableConfiguration conf) { + if (conf == null) { + return new Configuration(); + } else { + return conf.get(); + } + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java new file mode 100644 index 0000000..fd05a19 --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * {@link UserGroupInformation} helper methods. + */ +public class UGIHelper { + + /** + * Find the most appropriate UserGroupInformation to use. + * @param username the user name, or NULL if none is specified. + * @return the most appropriate UserGroupInformation + */ + public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException { + return UserGroupInformation.getBestUGI(null, username); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java index 637e686..d958cda 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java @@ -43,7 +43,7 @@ import org.apache.hadoop.io.Writable; * } * * - * @param the type of elements handled by this coder + * @param the type of elements handled by this coder. */ public class WritableCoder extends StandardCoder { private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java new file mode 100644 index 0000000..8b9a6d1 --- /dev/null +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.junit.Assert.assertEquals; + +import com.google.common.base.MoreObjects; +import java.io.File; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.mapred.AvroKey; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.Sink; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for HDFSFileSinkTest. + */ +public class HDFSFileSinkTest { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private final String part0 = "part-r-00000"; + private final String foobar = "foobar"; + + private void doWrite(Sink sink, + PipelineOptions options, + Iterable toWrite) throws Exception { + Sink.WriteOperation writeOperation = + (Sink.WriteOperation) sink.createWriteOperation(options); + Sink.Writer writer = writeOperation.createWriter(options); + writer.open(UUID.randomUUID().toString()); + for (T t: toWrite) { + writer.write(t); + } + String writeResult = writer.close(); + writeOperation.finalize(Collections.singletonList(writeResult), options); + } + + @Test + public void testWriteSingleRecord() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tmpFolder.newFolder(); + + HDFSFileSink sink = + HDFSFileSink.to( + file.toString(), + SequenceFileOutputFormat.class, + NullWritable.class, + Text.class, + new SerializableFunction>() { + @Override + public KV apply(String input) { + return KV.of(NullWritable.get(), new Text(input)); + } + }); + + doWrite(sink, options, Collections.singletonList(foobar)); + + SequenceFile.Reader.Option opts = + SequenceFile.Reader.file(new Path(file.toString(), part0)); + SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), opts); + assertEquals(NullWritable.class.getName(), reader.getKeyClassName()); + assertEquals(Text.class.getName(), reader.getValueClassName()); + NullWritable k = NullWritable.get(); + Text v = new Text(); + assertEquals(true, reader.next(k, v)); + assertEquals(NullWritable.get(), k); + assertEquals(new Text(foobar), v); + } + + @Test + public void testToText() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tmpFolder.newFolder(); + + HDFSFileSink sink = HDFSFileSink.toText(file.toString()); + + doWrite(sink, options, Collections.singletonList(foobar)); + + List strings = Files.readAllLines(new File(file.toString(), part0).toPath(), + Charset.forName("UTF-8")); + assertEquals(Collections.singletonList(foobar), strings); + } + + @DefaultCoder(AvroCoder.class) + static class GenericClass { + int intField; + String stringField; + public GenericClass() {} + public GenericClass(int intValue, String stringValue) { + this.intField = intValue; + this.stringField = stringValue; + } + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("intField", intField) + .add("stringField", stringField) + .toString(); + } + @Override + public int hashCode() { + return Objects.hash(intField, stringField); + } + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof GenericClass)) { + return false; + } + GenericClass o = (GenericClass) other; + return Objects.equals(intField, o.intField) && Objects.equals(stringField, o.stringField); + } + } + + @Test + public void testToAvro() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tmpFolder.newFolder(); + + HDFSFileSink, NullWritable> sink = HDFSFileSink.toAvro( + file.toString(), + AvroCoder.of(GenericClass.class), + new Configuration(false)); + + doWrite(sink, options, Collections.singletonList(new GenericClass(3, "foobar"))); + + GenericDatumReader datumReader = new GenericDatumReader(); + FileReader reader = + DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + ".avro"), datumReader); + GenericData.Record next = reader.next(null); + assertEquals("foobar", next.get("stringField").toString()); + assertEquals(3, next.get("intField")); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java index 4c3f1ce..ac6af40 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java @@ -51,7 +51,7 @@ import org.junit.rules.TemporaryFolder; */ public class HDFSFileSourceTest { - Random random = new Random(0L); + private Random random = new Random(0L); @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -62,9 +62,9 @@ public class HDFSFileSourceTest { List> expectedResults = createRandomRecords(3, 10, 0); File file = createFileWithData("tmp.seq", expectedResults); - HDFSFileSource source = - HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); + HDFSFileSource, IntWritable, Text> source = + HDFSFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); assertEquals(file.length(), source.getEstimatedSizeBytes(null)); @@ -86,13 +86,16 @@ public class HDFSFileSourceTest { List> data4 = createRandomRecords(3, 10, 30); createFileWithData("otherfile", data4); - HDFSFileSource source = - HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), - SequenceFileInputFormat.class, IntWritable.class, Text.class); List> expectedResults = new ArrayList<>(); expectedResults.addAll(data1); expectedResults.addAll(data2); expectedResults.addAll(data3); + + HDFSFileSource, IntWritable, Text> source = + HDFSFileSource.from( + new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); } @@ -111,10 +114,12 @@ public class HDFSFileSourceTest { List> data4 = createRandomRecords(3, 10, 30); createFileWithData("otherfile", data4); - HDFSFileSource source = - HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), + HDFSFileSource, IntWritable, Text> source = + HDFSFileSource.from( + new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); Source.Reader> reader = source.createReader(options); + // Closing an unstarted FilePatternReader should not throw an exception. try { reader.close(); @@ -128,11 +133,11 @@ public class HDFSFileSourceTest { PipelineOptions options = PipelineOptionsFactory.create(); List> expectedResults = createRandomRecords(3, 10000, 0); - File file = createFileWithData("tmp.avro", expectedResults); + File file = createFileWithData("tmp.seq", expectedResults); - HDFSFileSource source = - HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); + HDFSFileSource, IntWritable, Text> source = + HDFSFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); // Assert that the source produces the expected records assertEquals(expectedResults, readFromSource(source, options)); @@ -158,7 +163,7 @@ public class HDFSFileSourceTest { List> expectedResults = createRandomRecords(3, 10000, 0); File file = createFileWithData("tmp.avro", expectedResults); - HDFSFileSource source = + HDFSFileSource, IntWritable, Text> source = HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); @@ -178,8 +183,8 @@ public class HDFSFileSourceTest { throws IOException { File tmpFile = tmpFolder.newFile(filename); try (Writer writer = SequenceFile.createWriter(new Configuration(), - Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), - Writer.file(new Path(tmpFile.toURI())))) { + Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), + Writer.file(new Path(tmpFile.toURI())))) { for (KV record : records) { writer.append(record.getKey(), record.getValue()); @@ -189,7 +194,7 @@ public class HDFSFileSourceTest { } private List> createRandomRecords(int dataItemLength, - int numItems, int offset) { + int numItems, int offset) { List> records = new ArrayList<>(); for (int i = 0; i < numItems; i++) { IntWritable key = new IntWritable(i + offset);