beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [3/5] beam git commit: [BEAM-2135] Move hdfs to hadoop-file-system
Date Tue, 02 May 2017 19:58:35 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
new file mode 100644
index 0000000..fe2db5f
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+
+/**
+ * This class is deprecated, and only exists for HDFSFileSink.
+ */
+@Deprecated
+public abstract class Sink<T> implements Serializable, HasDisplayData {
+  /**
+   * Ensures that the sink is valid and can be written to before the write operation begins. One
+   * should use {@link com.google.common.base.Preconditions} to implement this method.
+   */
+  public abstract void validate(PipelineOptions options);
+
+  /**
+   * Returns an instance of a {@link WriteOperation} that can write to this Sink.
+   */
+  public abstract WriteOperation<T, ?> createWriteOperation();
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>By default, does not register any display data. Implementors may override this method
+   * to provide their own display data.
+   */
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {}
+
+  /**
+   * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
+   *
+   * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
+   * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
+   * a bundle to the sink.
+   *
+   * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance,
+   * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
+   *
+   * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
+   * call to {@code initialize} method and deserialized before calls to
+   * {@code createWriter} and {@code finalized}. However, it is not
+   * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
+   * state of the {@code WriteOperation}.
+   *
+   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
+   *
+   * @param <T> The type of objects to write
+   * @param <WriteT> The result of a per-bundle write
+   */
+  public abstract static class WriteOperation<T, WriteT> implements Serializable {
+    /**
+     * Performs initialization before writing to the sink. Called before writing begins.
+     */
+    public abstract void initialize(PipelineOptions options) throws Exception;
+
+    /**
+     * Indicates that the operation will be performing windowed writes.
+     */
+    public abstract void setWindowedWrites(boolean windowedWrites);
+
+    /**
+     * Given an Iterable of results from bundle writes, performs finalization after writing and
+     * closes the sink. Called after all bundle writes are complete.
+     *
+     * <p>The results that are passed to finalize are those returned by bundles that completed
+     * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
+     * one writer result will be passed to finalize for each bundle. An implementation of finalize
+     * should perform clean up of any failed and successfully retried bundles.  Note that these
+     * failed bundles will not have their writer result passed to finalize, so finalize should be
+     * capable of locating any temporary/partial output written by failed bundles.
+     *
+     * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
+     * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
+     * failure/retry or for redundancy.
+     *
+     * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
+     * finalize is called multiple times.
+     *
+     * @param writerResults an Iterable of results from successful bundle writes.
+     */
+    public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
+        throws Exception;
+
+    /**
+     * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
+     *
+     * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
+     * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
+     *
+     * <p>Must not mutate the state of the WriteOperation.
+     */
+    public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
+
+    /**
+     * Returns the Sink that this write operation writes to.
+     */
+    public abstract Sink<T> getSink();
+
+    /**
+     * Returns a coder for the writer result type.
+     */
+    public abstract Coder<WriteT> getWriterResultCoder();
+  }
+
+  /**
+   * A Writer writes a bundle of elements from a PCollection to a sink.
+   * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
+   * and {@link Writer#close} is called after all elements in the bundle have been written.
+   * {@link Writer#write} writes an element to the sink.
+   *
+   * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
+   * multiple instances of a Writer may be instantiated in different threads on the same worker.
+   *
+   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
+   *
+   * @param <T> The type of object to write
+   * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
+   */
+  public abstract static class Writer<T, WriteT> {
+    /**
+     * Performs bundle initialization. For example, creates a temporary file for writing or
+     * initializes any state that will be used across calls to {@link Writer#write}.
+     *
+     * <p>The unique id that is given to open should be used to ensure that the writer's output does
+     * not interfere with the output of other Writers, as a bundle may be executed many times for
+     * fault tolerance. See {@link Sink} for more information about bundle ids.
+     *
+     * <p>The window and paneInfo arguments are populated when windowed writes are requested.
+     * shard and numbShards are populated for the case of static sharding. In cases where the
+     * runner is dynamically picking sharding, shard and numShards might both be set to -1.
+     */
+    public abstract void openWindowed(String uId,
+                                      BoundedWindow window,
+                                      PaneInfo paneInfo,
+                                      int shard,
+                                      int numShards) throws Exception;
+
+    /**
+     * Perform bundle initialization for the case where the file is written unwindowed.
+     */
+    public abstract void openUnwindowed(String uId,
+                                        int shard,
+                                        int numShards) throws Exception;
+
+    public abstract void cleanup() throws Exception;
+
+    /**
+     * Called for each value in the bundle.
+     */
+    public abstract void write(T value) throws Exception;
+
+    /**
+     * Finishes writing the bundle. Closes any resources used for writing the bundle.
+     *
+     * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
+     * finalization. The result should contain some way to identify the output of this bundle (using
+     * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
+     * successful writes. See {@link Sink} for more information about bundle ids.
+     *
+     * @return the writer result
+     */
+    public abstract WriteT close() throws Exception;
+
+    /**
+     * Returns the write operation this writer belongs to.
+     */
+    public abstract WriteOperation<T, WriteT> getWriteOperation();
+
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
new file mode 100644
index 0000000..fd05a19
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * {@link UserGroupInformation} helper methods.
+ */
+public class UGIHelper {
+
+  /**
+   * Find the most appropriate UserGroupInformation to use.
+   * @param username the user name, or NULL if none is specified.
+   * @return the most appropriate UserGroupInformation
+   */
+  public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException {
+    return UserGroupInformation.getBestUGI(null, username);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
new file mode 100644
index 0000000..86a9246
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation;
+import org.apache.beam.sdk.io.hdfs.Sink.Writer;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is deprecated, and only exists currently for HDFSFileSink.
+ */
+@Deprecated
+public class Write<T> extends PTransform<PCollection<T>, PDone> {
+  private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+
+  private static final int UNKNOWN_SHARDNUM = -1;
+  private static final int UNKNOWN_NUMSHARDS = -1;
+
+  private final Sink<T> sink;
+  // This allows the number of shards to be dynamically computed based on the input
+  // PCollection.
+  @Nullable
+  private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+  // We don't use a side input for static sharding, as we want this value to be updatable
+  // when a pipeline is updated.
+  @Nullable
+  private final ValueProvider<Integer> numShardsProvider;
+  private boolean windowedWrites;
+
+  /**
+   * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
+   * control how many different shards are produced.
+   */
+  public static <T> Write<T> to(Sink<T> sink) {
+    checkNotNull(sink, "sink");
+    return new Write<>(sink, null /* runner-determined sharding */, null, false);
+  }
+
+  private Write(
+      Sink<T> sink,
+      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
+      @Nullable ValueProvider<Integer> numShardsProvider,
+      boolean windowedWrites) {
+    this.sink = sink;
+    this.computeNumShards = computeNumShards;
+    this.numShardsProvider = numShardsProvider;
+    this.windowedWrites = windowedWrites;
+  }
+
+  @Override
+  public PDone expand(PCollection<T> input) {
+    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
+        "%s can only be applied to an unbounded PCollection if doing windowed writes",
+        Write.class.getSimpleName());
+    return createWrite(input, sink.createWriteOperation());
+  }
+
+  @Override
+  public void validate(PipelineOptions options) {
+    sink.validate(options);
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
+        .include("sink", sink);
+    if (getSharding() != null) {
+      builder.include("sharding", getSharding());
+    } else if (getNumShards() != null) {
+      String numShards = getNumShards().isAccessible()
+          ? getNumShards().get().toString() : getNumShards().toString();
+      builder.add(DisplayData.item("numShards", numShards)
+          .withLabel("Fixed Number of Shards"));
+    }
+  }
+
+  /**
+   * Returns the {@link Sink} associated with this PTransform.
+   */
+  public Sink<T> getSink() {
+    return sink;
+  }
+
+  /**
+   * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
+   * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
+   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
+   * #withRunnerDeterminedSharding()}.
+   */
+  @Nullable
+  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
+    return computeNumShards;
+  }
+
+  public ValueProvider<Integer> getNumShards() {
+    return numShardsProvider;
+  }
+
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * specified number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   *
+   * <p>A value less than or equal to 0 will be equivalent to the default behavior of
+   * runner-determined sharding.
+   */
+  public Write<T> withNumShards(int numShards) {
+    if (numShards > 0) {
+      return withNumShards(StaticValueProvider.of(numShards));
+    }
+    return withRunnerDeterminedSharding();
+  }
+
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * {@link ValueProvider} specified number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   */
+  public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
+    return new Write<>(sink, null, numShardsProvider, windowedWrites);
+  }
+
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * specified {@link PTransform} to compute the number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   */
+  public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
+    checkNotNull(
+        sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
+    return new Write<>(sink, sharding, null, windowedWrites);
+  }
+
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} with
+   * runner-determined sharding.
+   */
+  public Write<T> withRunnerDeterminedSharding() {
+    return new Write<>(sink, null, null, windowedWrites);
+  }
+
+  /**
+   * Returns a new {@link Write} that writes preserves windowing on it's input.
+   *
+   * <p>If this option is not specified, windowing and triggering are replaced by
+   * {@link GlobalWindows} and {@link DefaultTrigger}.
+   *
+   * <p>If there is no data for a window, no output shards will be generated for that window.
+   * If a window triggers multiple times, then more than a single output shard might be
+   * generated multiple times; it's up to the sink implementation to keep these output shards
+   * unique.
+   *
+   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
+   * positive value.
+   */
+  public Write<T> withWindowedWrites() {
+    return new Write<>(sink, computeNumShards, numShardsProvider, true);
+  }
+
+  /**
+   * Writes all the elements in a bundle using a {@link Writer} produced by the
+   * {@link WriteOperation} associated with the {@link Sink}.
+   */
+  private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+    // Writer that will write the records in this bundle. Lazily
+    // initialized in processElement.
+    private Writer<T, WriteT> writer = null;
+    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+    WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+      this.writeOperationView = writeOperationView;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      // Lazily initialize the Writer
+      if (writer == null) {
+        WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+        LOG.info("Opening writer for write operation {}", writeOperation);
+        writer = writeOperation.createWriter(c.getPipelineOptions());
+
+        if (windowedWrites) {
+          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
+              UNKNOWN_NUMSHARDS);
+        } else {
+          writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+        }
+        LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+      }
+      try {
+        writer.write(c.element());
+      } catch (Exception e) {
+        // Discard write result and close the write.
+        try {
+          writer.close();
+          // The writer does not need to be reset, as this DoFn cannot be reused.
+        } catch (Exception closeException) {
+          if (closeException instanceof InterruptedException) {
+            // Do not silently ignore interrupted state.
+            Thread.currentThread().interrupt();
+          }
+          // Do not mask the exception that caused the write to fail.
+          e.addSuppressed(closeException);
+        }
+        throw e;
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      if (writer != null) {
+        WriteT result = writer.close();
+        c.output(result);
+        // Reset state in case of reuse.
+        writer = null;
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(Write.this);
+    }
+  }
+
+  /**
+   * Like {@link WriteBundles}, but where the elements for each shard have been collected into
+   * a single iterable.
+   *
+   * @see WriteBundles
+   */
+  private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+    private final PCollectionView<Integer> numShardsView;
+
+    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
+                        PCollectionView<Integer> numShardsView) {
+      this.writeOperationView = writeOperationView;
+      this.numShardsView = numShardsView;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
+      // In a sharded write, single input element represents one shard. We can open and close
+      // the writer in each call to processElement.
+      WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+      LOG.info("Opening writer for write operation {}", writeOperation);
+      Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+      if (windowedWrites) {
+        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
+            numShards);
+      } else {
+        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+      }
+      LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+
+      try {
+        try {
+          for (T t : c.element().getValue()) {
+            writer.write(t);
+          }
+        } catch (Exception e) {
+          try {
+            writer.close();
+          } catch (Exception closeException) {
+            if (closeException instanceof InterruptedException) {
+              // Do not silently ignore interrupted state.
+              Thread.currentThread().interrupt();
+            }
+            // Do not mask the exception that caused the write to fail.
+            e.addSuppressed(closeException);
+          }
+          throw e;
+        }
+
+        // Close the writer; if this throws let the error propagate.
+        WriteT result = writer.close();
+        c.output(result);
+      } catch (Exception e) {
+        // If anything goes wrong, make sure to delete the temporary file.
+        writer.cleanup();
+        throw e;
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(Write.this);
+    }
+  }
+
+  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
+    private final PCollectionView<Integer> numShardsView;
+    private final ValueProvider<Integer> numShardsProvider;
+    private int shardNumber;
+
+    ApplyShardingKey(PCollectionView<Integer> numShardsView,
+                     ValueProvider<Integer> numShardsProvider) {
+      this.numShardsView = numShardsView;
+      this.numShardsProvider = numShardsProvider;
+      shardNumber = UNKNOWN_SHARDNUM;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      int shardCount = 0;
+      if (numShardsView != null) {
+        shardCount = context.sideInput(numShardsView);
+      } else {
+        checkNotNull(numShardsProvider);
+        shardCount = numShardsProvider.get();
+      }
+      checkArgument(
+          shardCount > 0,
+          "Must have a positive number of shards specified for non-runner-determined sharding."
+              + " Got %s",
+          shardCount);
+      if (shardNumber == UNKNOWN_SHARDNUM) {
+        // We want to desynchronize the first record sharding key for each instance of
+        // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
+        shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
+      } else {
+        shardNumber = (shardNumber + 1) % shardCount;
+      }
+      context.output(KV.of(shardNumber, context.element()));
+    }
+  }
+
+  /**
+   * A write is performed as sequence of three {@link ParDo}'s.
+   *
+   * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
+   * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
+   * called. The output of this ParDo is a singleton PCollection
+   * containing the WriteOperation.
+   *
+   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
+   * ParDo over the PCollection of elements to write. In this bundle-writing phase,
+   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
+   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
+   * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
+   * every element in the bundle. The output of this ParDo is a PCollection of
+   * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
+   * each bundle.
+   *
+   * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
+   * the collection of writer results as a side-input. In this ParDo,
+   * {@link WriteOperation#finalize} is called to finalize the write.
+   *
+   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
+   * before the exception that caused the write to fail is propagated and the write result will be
+   * discarded.
+   *
+   * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
+   * deserialized in the bundle-writing and finalization phases, any state change to the
+   * WriteOperation object that occurs during initialization is visible in the latter phases.
+   * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
+   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
+   * WriteOperation).
+   */
+  private <WriteT> PDone createWrite(
+      PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
+    Pipeline p = input.getPipeline();
+    writeOperation.setWindowedWrites(windowedWrites);
+
+    // A coder to use for the WriteOperation.
+    @SuppressWarnings("unchecked")
+    Coder<WriteOperation<T, WriteT>> operationCoder =
+        (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
+
+    // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
+    // the sink.
+    PCollection<WriteOperation<T, WriteT>> operationCollection =
+        p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
+
+    // Initialize the resource in a do-once ParDo on the WriteOperation.
+    operationCollection = operationCollection
+        .apply("Initialize", ParDo.of(
+            new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) throws Exception {
+                WriteOperation<T, WriteT> writeOperation = c.element();
+                LOG.info("Initializing write operation {}", writeOperation);
+                writeOperation.initialize(c.getPipelineOptions());
+                writeOperation.setWindowedWrites(windowedWrites);
+                LOG.debug("Done initializing write operation {}", writeOperation);
+                // The WriteOperation is also the output of this ParDo, so it can have mutable
+                // state.
+                c.output(writeOperation);
+              }
+            }))
+        .setCoder(operationCoder);
+
+    // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
+    final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
+        operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
+
+    if (!windowedWrites) {
+      // Re-window the data into the global window and remove any existing triggers.
+      input =
+          input.apply(
+              Window.<T>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+    }
+
+
+    // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
+    // as a side input) and collect the results of the writes in a PCollection.
+    // There is a dependency between this ParDo and the first (the WriteOperation PCollection
+    // as a side input), so this will happen after the initial ParDo.
+    PCollection<WriteT> results;
+    final PCollectionView<Integer> numShardsView;
+    if (computeNumShards == null && numShardsProvider == null) {
+      if (windowedWrites) {
+        throw new IllegalStateException("When doing windowed writes, numShards must be set"
+            + "explicitly to a positive value");
+      }
+      numShardsView = null;
+      results = input
+          .apply("WriteBundles",
+              ParDo.of(new WriteBundles<>(writeOperationView))
+                  .withSideInputs(writeOperationView));
+    } else {
+      if (computeNumShards != null) {
+        numShardsView = input.apply(computeNumShards);
+        results  = input
+            .apply("ApplyShardLabel", ParDo.of(
+                new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
+                    .withSideInputs(numShardsView, writeOperationView));
+      } else {
+        numShardsView = null;
+        results = input
+            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
+                    .withSideInputs(writeOperationView));
+      }
+    }
+    results.setCoder(writeOperation.getWriterResultCoder());
+
+    if (windowedWrites) {
+      // When processing streaming windowed writes, results will arrive multiple times. This
+      // means we can't share the below implementation that turns the results into a side input,
+      // as new data arriving into a side input does not trigger the listening DoFn. Instead
+      // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
+      // whenever new data arrives.
+      PCollection<KV<Void, WriteT>> keyedResults =
+          results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
+      keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
+          .getWriterResultCoder()));
+
+      // Is the continuation trigger sufficient?
+      keyedResults
+          .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
+          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<WriteT> results = Lists.newArrayList(c.element().getValue());
+              writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
+            }
+          }).withSideInputs(writeOperationView));
+    } else {
+      final PCollectionView<Iterable<WriteT>> resultsView =
+          results.apply(View.<WriteT>asIterable());
+      ImmutableList.Builder<PCollectionView<?>> sideInputs =
+          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+      if (numShardsView != null) {
+        sideInputs.add(numShardsView);
+      }
+
+      // Finalize the write in another do-once ParDo on the singleton collection containing the
+      // Writer. The results from the per-bundle writes are given as an Iterable side input.
+      // The WriteOperation's state is the same as after its initialization in the first do-once
+      // ParDo. There is a dependency between this ParDo and the parallel write (the writer
+      // results collection as a side input), so it will happen after the parallel write.
+      // For the non-windowed case, we guarantee that  if no data is written but the user has
+      // set numShards, then all shards will be written out as empty files. For this reason we
+      // use a side input here.
+      operationCollection
+          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              WriteOperation<T, WriteT> writeOperation = c.element();
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+              // We must always output at least 1 shard, and honor user-specified numShards if
+              // set.
+              int minShardsNeeded;
+              if (numShardsView != null) {
+                minShardsNeeded = c.sideInput(numShardsView);
+              } else if (numShardsProvider != null) {
+                minShardsNeeded = numShardsProvider.get();
+              } else {
+                minShardsNeeded = 1;
+              }
+              int extraShardsNeeded = minShardsNeeded - results.size();
+              if (extraShardsNeeded > 0) {
+                LOG.info(
+                    "Creating {} empty output shards in addition to {} written for a total of "
+                        + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
+                for (int i = 0; i < extraShardsNeeded; ++i) {
+                  Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
+                      UNKNOWN_NUMSHARDS);
+                  WriteT emptyWrite = writer.close();
+                  results.add(emptyWrite);
+                }
+                LOG.debug("Done creating extra shards.");
+              }
+              writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
+            }
+          }).withSideInputs(sideInputs.build()));
+    }
+    return PDone.in(input.getPipeline());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
new file mode 100644
index 0000000..763b30a
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms used to read from the Hadoop file system (HDFS).
+ */
+package org.apache.beam.sdk.io.hdfs;

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
new file mode 100644
index 0000000..9fa6606
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.MoreObjects;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for HDFSFileSinkTest.
+ */
+public class HDFSFileSinkTest {
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  private final String part0 = "part-r-00000";
+  private final String foobar = "foobar";
+
+  private <T> void doWrite(Sink<T> sink,
+                           PipelineOptions options,
+                           Iterable<T> toWrite) throws Exception {
+    Sink.WriteOperation<T, String> writeOperation =
+        (Sink.WriteOperation<T, String>) sink.createWriteOperation();
+    Sink.Writer<T, String> writer = writeOperation.createWriter(options);
+    writer.openUnwindowed(UUID.randomUUID().toString(),  -1, -1);
+    for (T t: toWrite) {
+      writer.write(t);
+    }
+    String writeResult = writer.close();
+    writeOperation.finalize(Collections.singletonList(writeResult), options);
+  }
+
+  @Test
+  public void testWriteSingleRecord() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    File file = tmpFolder.newFolder();
+
+    HDFSFileSink<String, NullWritable, Text> sink =
+        HDFSFileSink.to(
+            file.toString(),
+            SequenceFileOutputFormat.class,
+            NullWritable.class,
+            Text.class,
+            new SerializableFunction<String, KV<NullWritable, Text>>() {
+              @Override
+              public KV<NullWritable, Text> apply(String input) {
+                return KV.of(NullWritable.get(), new Text(input));
+              }
+            });
+
+    doWrite(sink, options, Collections.singletonList(foobar));
+
+    SequenceFile.Reader.Option opts =
+        SequenceFile.Reader.file(new Path(file.toString(), part0));
+    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), opts);
+    assertEquals(NullWritable.class.getName(), reader.getKeyClassName());
+    assertEquals(Text.class.getName(), reader.getValueClassName());
+    NullWritable k = NullWritable.get();
+    Text v = new Text();
+    assertEquals(true, reader.next(k, v));
+    assertEquals(NullWritable.get(), k);
+    assertEquals(new Text(foobar), v);
+  }
+
+  @Test
+  public void testToText() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    File file = tmpFolder.newFolder();
+
+    HDFSFileSink<String, NullWritable, Text> sink = HDFSFileSink.toText(file.toString());
+
+    doWrite(sink, options, Collections.singletonList(foobar));
+
+    List<String> strings = Files.readAllLines(new File(file.toString(), part0).toPath(),
+        Charset.forName("UTF-8"));
+    assertEquals(Collections.singletonList(foobar), strings);
+  }
+
+  @DefaultCoder(AvroCoder.class)
+  static class GenericClass {
+    int intField;
+    String stringField;
+    public GenericClass() {}
+    public GenericClass(int intValue, String stringValue) {
+      this.intField = intValue;
+      this.stringField = stringValue;
+    }
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("intField", intField)
+          .add("stringField", stringField)
+          .toString();
+    }
+    @Override
+    public int hashCode() {
+      return Objects.hash(intField, stringField);
+    }
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof GenericClass)) {
+        return false;
+      }
+      GenericClass o = (GenericClass) other;
+      return Objects.equals(intField, o.intField) && Objects.equals(stringField, o.stringField);
+    }
+  }
+
+  @Test
+  public void testToAvro() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    File file = tmpFolder.newFolder();
+
+    HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = HDFSFileSink.toAvro(
+        file.toString(),
+        AvroCoder.of(GenericClass.class),
+        new Configuration(false));
+
+    doWrite(sink, options, Collections.singletonList(new GenericClass(3, "foobar")));
+
+    GenericDatumReader datumReader = new GenericDatumReader();
+    FileReader<GenericData.Record> reader =
+        DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + ".avro"), datumReader);
+    GenericData.Record next = reader.next(null);
+    assertEquals("foobar", next.get("stringField").toString());
+    assertEquals(3, next.get("intField"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
new file mode 100644
index 0000000..a964239
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for HDFSFileSource.
+ */
+public class HDFSFileSourceTest {
+
+  private Random random = new Random(0L);
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testFullyReadSingleFile() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
+    File file = createFileWithData("tmp.seq", expectedResults);
+
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+            HDFSFileSource.from(
+                    file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+  }
+
+  @Test
+  public void testFullyReadSingleFileWithSpaces() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
+    File file = createFileWithData("tmp data.seq", expectedResults);
+
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+            HDFSFileSource.from(
+                    file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+  }
+
+  @Test
+  public void testFullyReadFilePattern() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+    File file1 = createFileWithData("file1", data1);
+
+    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+    createFileWithData("file2", data2);
+
+    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+    createFileWithData("file3", data3);
+
+    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+    createFileWithData("otherfile", data4);
+
+    List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
+    expectedResults.addAll(data1);
+    expectedResults.addAll(data2);
+    expectedResults.addAll(data3);
+
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+        HDFSFileSource.from(
+            new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class,
+            IntWritable.class, Text.class);
+
+    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+  }
+
+  @Test
+  public void testCloseUnstartedFilePatternReader() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+    File file1 = createFileWithData("file1", data1);
+
+    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+    createFileWithData("file2", data2);
+
+    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+    createFileWithData("file3", data3);
+
+    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+    createFileWithData("otherfile", data4);
+
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+        HDFSFileSource.from(
+            new File(file1.getParent(), "file*").toString(),
+            SequenceFileInputFormat.class, IntWritable.class, Text.class);
+    Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
+
+    // Closing an unstarted FilePatternReader should not throw an exception.
+    try {
+      reader.close();
+    } catch (Exception e) {
+      fail("Closing an unstarted FilePatternReader should not throw an exception");
+    }
+  }
+
+  @Test
+  public void testSplits() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+    File file = createFileWithData("tmp.seq", expectedResults);
+
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+        HDFSFileSource.from(
+            file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+    // Assert that the source produces the expected records
+    assertEquals(expectedResults, readFromSource(source, options));
+
+    // Split with a small bundle size (has to be at least size of sync interval)
+    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
+        .split(SequenceFile.SYNC_INTERVAL, options);
+    assertTrue(splits.size() > 2);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+    int nonEmptySplits = 0;
+    for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    assertTrue(nonEmptySplits > 2);
+  }
+
+  @Test
+  public void testSplitEstimatedSize() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+    File file = createFileWithData("tmp.avro", expectedResults);
+
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+            IntWritable.class, Text.class);
+
+    long originalSize = source.getEstimatedSizeBytes(options);
+    long splitTotalSize = 0;
+    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split(
+        SequenceFile.SYNC_INTERVAL, options
+    );
+    for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
+      splitTotalSize += splitSource.getEstimatedSizeBytes(options);
+    }
+    // Assert that the estimated size of the whole is the sum of its parts
+    assertEquals(originalSize, splitTotalSize);
+  }
+
+  private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
+      throws IOException {
+    File tmpFile = tmpFolder.newFile(filename);
+    try (Writer writer = SequenceFile.createWriter(new Configuration(),
+        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+        Writer.file(new Path(tmpFile.toURI())))) {
+
+      for (KV<IntWritable, Text> record : records) {
+        writer.append(record.getKey(), record.getValue());
+      }
+    }
+    return tmpFile;
+  }
+
+  private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
+                                                          int numItems, int offset) {
+    List<KV<IntWritable, Text>> records = new ArrayList<>();
+    for (int i = 0; i < numItems; i++) {
+      IntWritable key = new IntWritable(i + offset);
+      Text value = new Text(createRandomString(dataItemLength));
+      records.add(KV.of(key, value));
+    }
+    return records;
+  }
+
+  private String createRandomString(int length) {
+    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < length; i++) {
+      builder.append(chars[random.nextInt(chars.length)]);
+    }
+    return builder.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
new file mode 100644
index 0000000..6963116
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.hadoop.conf.Configuration;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test for {@link HadoopFileSystemModule}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemModuleTest {
+  @Test
+  public void testObjectMapperIsAbleToFindModule() throws Exception {
+    List<Module> modules = ObjectMapper.findModules(ReflectHelpers.findClassLoader());
+    assertThat(modules, hasItem(Matchers.<Module>instanceOf(HadoopFileSystemModule.class)));
+  }
+
+  @Test
+  public void testConfigurationSerializationDeserialization() throws Exception {
+    Configuration baseConfiguration = new Configuration(false);
+    baseConfiguration.set("testPropertyA", "baseA");
+    baseConfiguration.set("testPropertyC", "baseC");
+    Configuration configuration = new Configuration(false);
+    configuration.addResource(baseConfiguration);
+    configuration.set("testPropertyA", "A");
+    configuration.set("testPropertyB", "B");
+    ObjectMapper objectMapper = new ObjectMapper();
+    objectMapper.registerModule(new HadoopFileSystemModule());
+    String serializedConfiguration = objectMapper.writeValueAsString(configuration);
+    Configuration deserializedConfiguration =
+        objectMapper.readValue(serializedConfiguration, Configuration.class);
+    assertThat(deserializedConfiguration, Matchers.<Map.Entry<String, String>>contains(
+        new AbstractMap.SimpleEntry("testPropertyA", "A"),
+        new AbstractMap.SimpleEntry("testPropertyB", "B"),
+        new AbstractMap.SimpleEntry("testPropertyC", "baseC")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
new file mode 100644
index 0000000..2be3d93
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemOptionsRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemOptionsRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (PipelineOptionsRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
+      if (registrar instanceof HadoopFileSystemOptionsRegistrar) {
+        assertThat(registrar.getPipelineOptions(),
+            Matchers.<Class<?>>contains(HadoopFileSystemOptions.class));
+        return;
+      }
+    }
+    fail("Expected to find " + HadoopFileSystemOptionsRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
new file mode 100644
index 0000000..634528b
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemOptions}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemOptionsTest {
+  @Test
+  public void testParsingHdfsConfiguration() {
+    HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(
+        "--hdfsConfiguration=["
+            + "{\"propertyA\": \"A\"},"
+            + "{\"propertyB\": \"B\"}]").as(HadoopFileSystemOptions.class);
+    assertEquals(2, options.getHdfsConfiguration().size());
+    assertThat(options.getHdfsConfiguration().get(0), Matchers.<Map.Entry<String, String>>contains(
+        new AbstractMap.SimpleEntry("propertyA", "A")));
+    assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains(
+        new AbstractMap.SimpleEntry("propertyB", "B")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
new file mode 100644
index 0000000..96f7102
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.net.URI;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemRegistrarTest {
+
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  private Configuration configuration;
+  private MiniDFSCluster hdfsCluster;
+  private URI hdfsClusterBaseUri;
+
+  @Before
+  public void setUp() throws Exception {
+    configuration = new Configuration();
+    configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
+    hdfsCluster = builder.build();
+    hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hdfsCluster.shutdown();
+  }
+
+  @Test
+  public void testServiceLoader() {
+    HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
+    options.setHdfsConfiguration(ImmutableList.of(configuration));
+    for (FileSystemRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
+      if (registrar instanceof HadoopFileSystemRegistrar) {
+        Iterable<FileSystem> fileSystems = registrar.fromOptions(options);
+        assertEquals(hdfsClusterBaseUri.getScheme(),
+            ((HadoopFileSystem) Iterables.getOnlyElement(fileSystems)).getScheme());
+        return;
+      }
+    }
+    fail("Expected to find " + HadoopFileSystemRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
new file mode 100644
index 0000000..cf86c36
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.io.ByteStreams;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystem}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemTest {
+
+  @Rule public TestPipeline p = TestPipeline.create();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private Configuration configuration;
+  private MiniDFSCluster hdfsCluster;
+  private URI hdfsClusterBaseUri;
+  private HadoopFileSystem fileSystem;
+
+  @Before
+  public void setUp() throws Exception {
+    configuration = new Configuration();
+    configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
+    hdfsCluster = builder.build();
+    hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
+    fileSystem = new HadoopFileSystem(configuration);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hdfsCluster.shutdown();
+  }
+
+  @Test
+  public void testCreateAndReadFile() throws Exception {
+    create("testFile", "testData".getBytes());
+    assertArrayEquals("testData".getBytes(), read("testFile"));
+  }
+
+  @Test
+  public void testCopy() throws Exception {
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+    fileSystem.copy(
+        ImmutableList.of(
+            testPath("testFileA"),
+            testPath("testFileB")),
+        ImmutableList.of(
+            testPath("copyTestFileA"),
+            testPath("copyTestFileB")));
+    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+    assertArrayEquals("testDataA".getBytes(), read("copyTestFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("copyTestFileB"));
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+    create("testFileC", "testDataC".getBytes());
+
+    // ensure files exist
+    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+    assertArrayEquals("testDataC".getBytes(), read("testFileC"));
+
+    fileSystem.delete(ImmutableList.of(
+        testPath("testFileA"),
+        testPath("testFileC")));
+
+    List<MatchResult> results =
+        fileSystem.match(ImmutableList.of(testPath("testFile*").toString()));
+    assertThat(results, contains(MatchResult.create(Status.OK, ImmutableList.of(
+        Metadata.builder()
+            .setResourceId(testPath("testFileB"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataB".getBytes().length)
+            .build()))));
+  }
+
+  @Test
+  public void testMatch() throws Exception {
+    create("testFileAA", "testDataAA".getBytes());
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+
+    // ensure files exist
+    assertArrayEquals("testDataAA".getBytes(), read("testFileAA"));
+    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+
+    List<MatchResult> results =
+        fileSystem.match(ImmutableList.of(testPath("testFileA*").toString()));
+    assertEquals(Status.OK, Iterables.getOnlyElement(results).status());
+    assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder(
+        Metadata.builder()
+            .setResourceId(testPath("testFileAA"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataAA".getBytes().length)
+            .build(),
+        Metadata.builder()
+            .setResourceId(testPath("testFileA"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataA".getBytes().length)
+            .build()));
+  }
+
+  @Test
+  public void testRename() throws Exception {
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+
+    // ensure files exist
+    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+
+    fileSystem.rename(
+        ImmutableList.of(
+            testPath("testFileA"), testPath("testFileB")),
+        ImmutableList.of(
+            testPath("renameFileA"), testPath("renameFileB")));
+
+    List<MatchResult> results =
+        fileSystem.match(ImmutableList.of(testPath("*").toString()));
+    assertEquals(Status.OK, Iterables.getOnlyElement(results).status());
+    assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder(
+        Metadata.builder()
+            .setResourceId(testPath("renameFileA"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataA".getBytes().length)
+            .build(),
+        Metadata.builder()
+            .setResourceId(testPath("renameFileB"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataB".getBytes().length)
+            .build()));
+
+    // ensure files exist
+    assertArrayEquals("testDataA".getBytes(), read("renameFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("renameFileB"));
+  }
+
+  @Test
+  public void testMatchNewResource() throws Exception {
+    // match file spec
+    assertEquals(testPath("file"),
+        fileSystem.matchNewResource(testPath("file").toString(), false));
+    // match dir spec missing '/'
+    assertEquals(testPath("dir/"),
+        fileSystem.matchNewResource(testPath("dir").toString(), true));
+    // match dir spec with '/'
+    assertEquals(testPath("dir/"),
+        fileSystem.matchNewResource(testPath("dir/").toString(), true));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Expected file path but received directory path");
+    fileSystem.matchNewResource(testPath("dir/").toString(), false);
+  }
+
+  @Test
+  @Ignore("TestPipeline needs a way to take in HadoopFileSystemOptions")
+  public void testReadPipeline() throws Exception {
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+    create("testFileC", "testDataC".getBytes());
+
+    HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions()
+        .as(HadoopFileSystemOptions.class);
+    options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
+    FileSystems.setDefaultConfigInWorkers(options);
+    PCollection<String> pc = p.apply(
+        TextIO.read().from(testPath("testFile*").toString()));
+    PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
+    p.run();
+  }
+
+  private void create(String relativePath, byte[] contents) throws Exception {
+    try (WritableByteChannel channel = fileSystem.create(
+        testPath(relativePath),
+        StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build())) {
+      channel.write(ByteBuffer.wrap(contents));
+    }
+  }
+
+  private byte[] read(String relativePath) throws Exception {
+    try (ReadableByteChannel channel = fileSystem.open(testPath(relativePath))) {
+      return ByteStreams.toByteArray(Channels.newInputStream(channel));
+    }
+  }
+
+  private HadoopResourceId testPath(String relativePath) {
+    return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/README.md b/sdks/java/io/hdfs/README.md
deleted file mode 100644
index 3a734f2..0000000
--- a/sdks/java/io/hdfs/README.md
+++ /dev/null
@@ -1,43 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-# HDFS IO
-
-This library provides HDFS sources and sinks to make it possible to read and
-write Apache Hadoop file formats from Apache Beam pipelines.
-
-Currently, only the read path is implemented. A `HDFSFileSource` allows any
-Hadoop `FileInputFormat` to be read as a `PCollection`.
-
-A `HDFSFileSource` can be read from using the
-`org.apache.beam.sdk.io.Read` transform. For example:
-
-```java
-HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
-  MyKey.class, MyValue.class);
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
-```
-
-Alternatively, the `readFrom` method is a convenience method that returns a read
-transform. For example:
-
-```java
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path,
-  MyInputFormat.class, MyKey.class, MyValue.class));
-```

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
deleted file mode 100644
index daa3b26..0000000
--- a/sdks/java/io/hdfs/pom.xml
+++ /dev/null
@@ -1,195 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>0.7.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-sdks-java-io-hdfs</artifactId>
-  <name>Apache Beam :: SDKs :: Java :: IO :: HDFS</name>
-  <description>Library to read and write Hadoop/HDFS file formats from Beam.</description>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <systemPropertyVariables>
-            <beamUseDummyRunner>false</beamUseDummyRunner>
-          </systemPropertyVariables>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
-  <properties>
-    <!--
-      This is the version of Hadoop used to compile the hadoop-common module.
-      This dependency is defined with a provided scope.
-      Users must supply their own Hadoop version at runtime.
-    -->
-    <hadoop.version>2.7.3</hadoop.version>
-  </properties>
-
-  <dependencyManagement>
-    <!--
-       We define dependencies here instead of sdks/java/io because
-       of a version mimatch between this Hadoop version and other
-       Hadoop versions declared in other io submodules.
-    -->
-    <dependencies>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-hdfs</artifactId>
-        <classifier>tests</classifier>
-        <version>${hadoop.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-minicluster</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.auto.service</groupId>
-      <artifactId>auto-service</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.auto.value</groupId>
-      <artifactId>auto-value</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <version>${avro.version}</version>
-      <classifier>hadoop2</classifier>
-      <exclusions>
-        <!-- exclude old Jetty version of servlet API -->
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>servlet-api</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <!-- test dependencies -->
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minicluster</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-direct-java</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>


Mime
View raw message