hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject hadoop git commit: HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei)
Date Thu, 15 Jun 2017 17:59:17 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 7d81b0bea -> abac844c9


HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei)

(cherry picked from commit 5fbec46525d6d49837d934556b59ba77bd2301a8)

    Conflicts:
    hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
    hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/abac844c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/abac844c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/abac844c

Branch: refs/heads/branch-2
Commit: abac844c90b5c9bff9f239032b96fd8e1e04dc15
Parents: 7d81b0b
Author: Lei Xu <lei@apache.org>
Authored: Wed Jun 14 23:17:53 2017 -0700
Committer: Lei Xu <lei@apache.org>
Committed: Thu Jun 15 10:59:11 2017 -0700

----------------------------------------------------------------------
 .../hadoop/fs/FSDataOutputStreamBuilder.java    | 171 ++++++++++++++-----
 .../java/org/apache/hadoop/fs/FileSystem.java   |  31 +++-
 .../org/apache/hadoop/fs/FilterFileSystem.java  |   4 +-
 .../org/apache/hadoop/fs/HarFileSystem.java     |   4 +-
 .../apache/hadoop/fs/TestLocalFileSystem.java   |  10 +-
 .../hadoop/hdfs/DistributedFileSystem.java      | 150 ++++++++++++++--
 .../hdfs/server/balancer/NameNodeConnector.java |   5 +-
 .../hadoop/hdfs/TestDistributedFileSystem.java  |  80 +++++++--
 .../namenode/TestFavoredNodesEndToEnd.java      |   2 +-
 9 files changed, 370 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
index 55836cc..0527202 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
@@ -18,36 +18,70 @@
 package org.apache.hadoop.fs;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.util.EnumSet;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 
-/** Base of specific file system FSDataOutputStreamBuilder. */
+/**
+ * Builder for {@link FSDataOutputStream} and its subclasses.
+ *
+ * It is used to create {@link FSDataOutputStream} when creating a new file or
+ * appending an existing file on {@link FileSystem}.
+ *
+ * By default, it does not create parent directory that do not exist.
+ * {@link FileSystem#createNonRecursive(Path, boolean, int, short, long,
+ * Progressable)}.
+ *
+ * To create missing parent directory, use {@link #recursive()}.
+ */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class FSDataOutputStreamBuilder {
-  private Path path = null;
+public abstract class FSDataOutputStreamBuilder
+    <S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>>
{
+  private final FileSystem fs;
+  private final Path path;
   private FsPermission permission = null;
-  private Integer bufferSize;
-  private Short replication;
-  private Long blockSize;
+  private int bufferSize;
+  private short replication;
+  private long blockSize;
+  /** set to true to create missing directory. */
+  private boolean recursive = false;
+  private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class);
   private Progressable progress = null;
-  private EnumSet<CreateFlag> flags = null;
   private ChecksumOpt checksumOpt = null;
 
-  private final FileSystem fs;
-
-  protected FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+  /**
+   * Return the concrete implementation of the builder instance.
+   */
+  protected abstract B getThisBuilder();
+
+  /**
+   * Constructor.
+   */
+  protected FSDataOutputStreamBuilder(@Nonnull FileSystem fileSystem,
+      @Nonnull Path p) {
+    Preconditions.checkNotNull(fileSystem);
+    Preconditions.checkNotNull(p);
     fs = fileSystem;
     path = p;
+    bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+        IO_FILE_BUFFER_SIZE_DEFAULT);
+    replication = fs.getDefaultReplication(path);
+    blockSize = fs.getDefaultBlockSize(p);
+  }
+
+  protected FileSystem getFS() {
+    return fs;
   }
 
   protected Path getPath() {
@@ -56,91 +90,136 @@ public class FSDataOutputStreamBuilder {
 
   protected FsPermission getPermission() {
     if (permission == null) {
-      return FsPermission.getFileDefault();
+      permission = FsPermission.getFileDefault();
     }
     return permission;
   }
 
-  public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
+  /**
+   * Set permission for the file.
+   */
+  public B permission(@Nonnull final FsPermission perm) {
     Preconditions.checkNotNull(perm);
     permission = perm;
-    return this;
+    return getThisBuilder();
   }
 
   protected int getBufferSize() {
-    if (bufferSize == null) {
-      return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
-          IO_FILE_BUFFER_SIZE_DEFAULT);
-    }
     return bufferSize;
   }
 
-  public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
+  /**
+   * Set the size of the buffer to be used.
+   */
+  public B bufferSize(int bufSize) {
     bufferSize = bufSize;
-    return this;
+    return getThisBuilder();
   }
 
   protected short getReplication() {
-    if (replication == null) {
-      return fs.getDefaultReplication(getPath());
-    }
     return replication;
   }
 
-  public FSDataOutputStreamBuilder setReplication(short replica) {
+  /**
+   * Set replication factor.
+   */
+  public B replication(short replica) {
     replication = replica;
-    return this;
+    return getThisBuilder();
   }
 
   protected long getBlockSize() {
-    if (blockSize == null) {
-      return fs.getDefaultBlockSize(getPath());
-    }
     return blockSize;
   }
 
-  public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
+  /**
+   * Set block size.
+   */
+  public B blockSize(long blkSize) {
     blockSize = blkSize;
-    return this;
+    return getThisBuilder();
+  }
+
+  /**
+   * Return true to create the parent directories if they do not exist.
+   */
+  protected boolean isRecursive() {
+    return recursive;
+  }
+
+  /**
+   * Create the parent directory if they do not exist.
+   */
+  public B recursive() {
+    recursive = true;
+    return getThisBuilder();
   }
 
   protected Progressable getProgress() {
     return progress;
   }
 
-  public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
+  /**
+   * Set the facility of reporting progress.
+   */
+  public B progress(@Nonnull final Progressable prog) {
     Preconditions.checkNotNull(prog);
     progress = prog;
-    return this;
+    return getThisBuilder();
   }
 
   protected EnumSet<CreateFlag> getFlags() {
-    if (flags == null) {
-      return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-    }
     return flags;
   }
 
-  public FSDataOutputStreamBuilder setFlags(
-      final EnumSet<CreateFlag> enumFlags) {
-    Preconditions.checkNotNull(enumFlags);
-    flags = enumFlags;
-    return this;
+  /**
+   * Create an FSDataOutputStream at the specified path.
+   */
+  public B create() {
+    flags.add(CreateFlag.CREATE);
+    return getThisBuilder();
+  }
+
+  /**
+   * Set to true to overwrite the existing file.
+   * Set it to false, an exception will be thrown when calling {@link #build()}
+   * if the file exists.
+   */
+  public B overwrite(boolean overwrite) {
+    if (overwrite) {
+      flags.add(CreateFlag.OVERWRITE);
+    } else {
+      flags.remove(CreateFlag.OVERWRITE);
+    }
+    return getThisBuilder();
+  }
+
+  /**
+   * Append to an existing file (optional operation).
+   */
+  public B append() {
+    flags.add(CreateFlag.APPEND);
+    return getThisBuilder();
   }
 
   protected ChecksumOpt getChecksumOpt() {
     return checksumOpt;
   }
 
-  public FSDataOutputStreamBuilder setChecksumOpt(
-      final ChecksumOpt chksumOpt) {
+  /**
+   * Set checksum opt.
+   */
+  public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
     Preconditions.checkNotNull(chksumOpt);
     checksumOpt = chksumOpt;
-    return this;
+    return getThisBuilder();
   }
 
-  public FSDataOutputStream build() throws IOException {
-    return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
-        getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
-  }
+  /**
+   * Create the FSDataOutputStream to write on the file system.
+   *
+   * @throws HadoopIllegalArgumentException if the parameters are not valid.
+   * @throws IOException on errors when file system creates or appends the file.
+   */
+  public abstract S build() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 89b6ba1..e1329f4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -4124,8 +4124,34 @@ public abstract class FileSystem extends Configured implements Closeable
{
     return GlobalStorageStatistics.INSTANCE;
   }
 
+  private static final class FileSystemDataOutputStreamBuilder extends
+      FSDataOutputStreamBuilder<FSDataOutputStream,
+        FileSystemDataOutputStreamBuilder> {
+
+    /**
+     * Constructor.
+     */
+    protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+      super(fileSystem, p);
+    }
+
+    @Override
+    public FSDataOutputStream build() throws IOException {
+      return getFS().create(getPath(), getPermission(), getFlags(),
+          getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+          getChecksumOpt());
+    }
+
+    @Override
+    protected FileSystemDataOutputStreamBuilder getThisBuilder() {
+      return this;
+    }
+  }
+
   /**
    * Create a new FSDataOutputStreamBuilder for the file with path.
+   * Files are overwritten by default.
+   *
    * @param path file path
    * @return a FSDataOutputStreamBuilder object to build the file
    *
@@ -4133,7 +4159,8 @@ public abstract class FileSystem extends Configured implements Closeable
{
    * builder interface becomes stable.
    */
   @InterfaceAudience.Private
-  protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
-    return new FSDataOutputStreamBuilder(this, path);
+  protected FSDataOutputStreamBuilder createFile(Path path) {
+    return new FileSystemDataOutputStreamBuilder(this, path)
+        .create().overwrite(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index 3466922..e940065 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -667,7 +667,7 @@ public class FilterFileSystem extends FileSystem {
   }
 
   @Override
-  protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
-    return fs.newFSDataOutputStreamBuilder(path);
+  public FSDataOutputStreamBuilder createFile(Path path) {
+    return fs.createFile(path);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
index 6f57c7b..ec3a457 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
@@ -1270,7 +1270,7 @@ public class HarFileSystem extends FileSystem {
   }
 
   @Override
-  public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
-    return fs.newFSDataOutputStreamBuilder(path);
+  public FSDataOutputStreamBuilder createFile(Path path) {
+    return fs.createFile(path);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
index a9f5a6e..f9b8cfa 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
@@ -654,9 +654,9 @@ public class TestLocalFileSystem {
 
     try {
       FSDataOutputStreamBuilder builder =
-          fileSys.newFSDataOutputStreamBuilder(path);
+          fileSys.createFile(path);
       FSDataOutputStream out = builder.build();
-      String content = "Create with a generic type of createBuilder!";
+      String content = "Create with a generic type of createFile!";
       byte[] contentOrigin = content.getBytes("UTF8");
       out.write(contentOrigin);
       out.close();
@@ -675,7 +675,7 @@ public class TestLocalFileSystem {
     // Test value not being set for replication, block size, buffer size
     // and permission
     FSDataOutputStreamBuilder builder =
-        fileSys.newFSDataOutputStreamBuilder(path);
+        fileSys.createFile(path);
     builder.build();
     Assert.assertEquals("Should be default block size",
         builder.getBlockSize(), fileSys.getDefaultBlockSize());
@@ -689,8 +689,8 @@ public class TestLocalFileSystem {
         builder.getPermission(), FsPermission.getFileDefault());
 
     // Test set 0 to replication, block size and buffer size
-    builder = fileSys.newFSDataOutputStreamBuilder(path);
-    builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
+    builder = fileSys.createFile(path);
+    builder.bufferSize(0).blockSize(0).replication((short) 0);
     Assert.assertEquals("Block size should be 0",
         builder.getBlockSize(), 0);
     Assert.assertEquals("Replication factor should be 0",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index a09c1b5..3e7c899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -101,6 +101,8 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import javax.annotation.Nonnull;
+
 /****************************************************************
  * Implementation of the abstract FileSystem for the DFS system.
  * This object is the way end-user code interacts with a Hadoop
@@ -541,6 +543,48 @@ public class DistributedFileSystem extends FileSystem {
   }
 
   /**
+   * Similar to {@link #create(Path, FsPermission, EnumSet, int, short, long,
+   * Progressable, ChecksumOpt, InetSocketAddress[])}, it provides a
+   * HDFS-specific version of {@link #createNonRecursive(Path, FsPermission,
+   * EnumSet, int, short, long, Progressable)} with a few additions.
+   *
+   * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable,
+   * ChecksumOpt, InetSocketAddress[]) for the descriptions of
+   * additional parameters, i.e., favoredNodes and ecPolicyName.
+   */
+  private HdfsDataOutputStream createNonRecursive(final Path f,
+      final FsPermission permission, final EnumSet<CreateFlag> flag,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress, final ChecksumOpt checksumOpt,
+      final InetSocketAddress[] favoredNodes)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<HdfsDataOutputStream>() {
+      @Override
+      public HdfsDataOutputStream doCall(final Path p) throws IOException {
+        final DFSOutputStream out = dfs.create(getPathName(f), permission,
+            flag, false, replication, blockSize, progress, bufferSize,
+            checksumOpt, favoredNodes);
+        return dfs.createWrappedOutputStream(out, statistics);
+      }
+      @Override
+      public HdfsDataOutputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          return myDfs.createNonRecursive(p, permission, flag, bufferSize,
+              replication, blockSize, progress, checksumOpt, favoredNodes);
+        }
+        throw new UnsupportedOperationException("Cannot create with" +
+            " favoredNodes through a symlink to a non-DistributedFileSystem: "
+            + f + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
+
+  /**
    * Same as create(), except fails if parent directory doesn't already exist.
    */
   @Override
@@ -2603,40 +2647,120 @@ public class DistributedFileSystem extends FileSystem {
   }
 
   /**
-   * Extends FSDataOutputStreamBuilder to support special requirements
-   * of DistributedFileSystem.
+   * HdfsDataOutputStreamBuilder provides the HDFS-specific capabilities to
+   * write file on HDFS.
    */
-  public static class HdfsDataOutputStreamBuilder
-      extends FSDataOutputStreamBuilder {
+  public static final class HdfsDataOutputStreamBuilder
+      extends FSDataOutputStreamBuilder<
+      HdfsDataOutputStream, HdfsDataOutputStreamBuilder> {
     private final DistributedFileSystem dfs;
     private InetSocketAddress[] favoredNodes = null;
 
-    public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
+    /**
+     * Construct a HdfsDataOutputStream builder for a file.
+     * @param dfs the {@link DistributedFileSystem} instance.
+     * @param path the path of the file to create / append.
+     */
+    private HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
       super(dfs, path);
       this.dfs = dfs;
     }
 
-    protected InetSocketAddress[] getFavoredNodes() {
+    @Override
+    protected HdfsDataOutputStreamBuilder getThisBuilder() {
+      return this;
+    }
+
+    private InetSocketAddress[] getFavoredNodes() {
       return favoredNodes;
     }
 
-    public HdfsDataOutputStreamBuilder setFavoredNodes(
-        final InetSocketAddress[] nodes) {
+    /**
+     * Set favored DataNodes.
+     * @param nodes the addresses of the favored DataNodes.
+     */
+    public HdfsDataOutputStreamBuilder favoredNodes(
+        @Nonnull final InetSocketAddress[] nodes) {
       Preconditions.checkNotNull(nodes);
       favoredNodes = nodes.clone();
       return this;
     }
 
+    /**
+     * Force closed blocks to disk.
+     *
+     * @see CreateFlag for the details.
+     */
+    public HdfsDataOutputStreamBuilder syncBlock() {
+      getFlags().add(CreateFlag.SYNC_BLOCK);
+      return this;
+    }
+
+    /**
+     * Create the block on transient storage if possible.
+     *
+     * @see CreateFlag for the details.
+     */
+    public HdfsDataOutputStreamBuilder lazyPersist() {
+      getFlags().add(CreateFlag.LAZY_PERSIST);
+      return this;
+    }
+
+    /**
+     * Append data to a new block instead of the end of the last partial block.
+     *
+     * @see CreateFlag for the details.
+     */
+    public HdfsDataOutputStreamBuilder newBlock() {
+      getFlags().add(CreateFlag.NEW_BLOCK);
+      return this;
+    }
+
+    /**
+     * Advise that a block replica NOT be written to the local DataNode.
+     *
+     * @see CreateFlag for the details.
+     */
+    public HdfsDataOutputStreamBuilder noLocalWrite() {
+      getFlags().add(CreateFlag.NO_LOCAL_WRITE);
+      return this;
+    }
+
+    @VisibleForTesting
+    @Override
+    protected EnumSet<CreateFlag> getFlags() {
+      return super.getFlags();
+    }
+
+    /**
+     * Build HdfsDataOutputStream to write.
+     *
+     * @return a fully-initialized OutputStream.
+     * @throws IOException on I/O errors.
+     */
     @Override
     public HdfsDataOutputStream build() throws IOException {
-      return dfs.create(getPath(), getPermission(), getFlags(),
-          getBufferSize(), getReplication(), getBlockSize(),
-          getProgress(), getChecksumOpt(), getFavoredNodes());
+      if (isRecursive()) {
+        return dfs.create(getPath(), getPermission(), getFlags(),
+            getBufferSize(), getReplication(), getBlockSize(),
+            getProgress(), getChecksumOpt(), getFavoredNodes());
+      } else {
+        return dfs.createNonRecursive(getPath(), getPermission(), getFlags(),
+            getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+            getChecksumOpt(), getFavoredNodes());
+      }
     }
   }
 
+  /**
+   * Create a HdfsDataOutputStreamBuilder to create a file on DFS.
+   * Similar to {@link #create(Path)}, file is overwritten by default.
+   *
+   * @param path the path of the file to create.
+   * @return A HdfsDataOutputStreamBuilder for creating a file.
+   */
   @Override
-  public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
-    return new HdfsDataOutputStreamBuilder(this, path);
+  public HdfsDataOutputStreamBuilder createFile(Path path) {
+    return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 0041841..30d014d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -242,7 +242,10 @@ public class NameNodeConnector implements Closeable {
         IOUtils.closeStream(fs.append(idPath));
         fs.delete(idPath, true);
       }
-      final FSDataOutputStream fsout = fs.create(idPath, false);
+
+      final FSDataOutputStream fsout = fs.createFile(idPath)
+          .overwrite(false).recursive().build();
+
       // mark balancer idPath to be deleted during filesystem closure
       fs.deleteOnExit(idPath);
       if (write2IdFile) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 24aa1be..fe2dbb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FileChecksum;
@@ -78,6 +79,7 @@ import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@@ -1542,36 +1544,84 @@ public class TestDistributedFileSystem {
     }
   }
 
+  private void testBuilderSetters(DistributedFileSystem fs) {
+    Path testFilePath = new Path("/testBuilderSetters");
+    HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath);
+
+    builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite();
+    EnumSet<CreateFlag> flags = builder.getFlags();
+    assertTrue(flags.contains(CreateFlag.APPEND));
+    assertTrue(flags.contains(CreateFlag.CREATE));
+    assertTrue(flags.contains(CreateFlag.NEW_BLOCK));
+    assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE));
+    assertFalse(flags.contains(CreateFlag.OVERWRITE));
+    assertFalse(flags.contains(CreateFlag.SYNC_BLOCK));
+  }
+
+  @Test
+  public void testHdfsDataOutputStreamBuilderSetParameters()
+      throws IOException {
+    Configuration conf = getTestConfiguration();
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      testBuilderSetters(fs);
+    }
+  }
+
   @Test
   public void testDFSDataOutputStreamBuilder() throws Exception {
     Configuration conf = getTestConfiguration();
-    MiniDFSCluster cluster = null;
     String testFile = "/testDFSDataOutputStreamBuilder";
     Path testFilePath = new Path(testFile);
-    try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build()) {
       DistributedFileSystem fs = cluster.getFileSystem();
 
       // Test create an empty file
-      FSDataOutputStream out =
-          fs.newFSDataOutputStreamBuilder(testFilePath).build();
-      out.close();
+      try (FSDataOutputStream out =
+               fs.createFile(testFilePath).build()) {
+        LOG.info("Test create an empty file");
+      }
 
       // Test create a file with content, and verify the content
       String content = "This is a test!";
-      out = fs.newFSDataOutputStreamBuilder(testFilePath)
-          .setBufferSize(4096).setReplication((short) 1)
-          .setBlockSize(4096).build();
-      byte[] contentOrigin = content.getBytes("UTF8");
-      out.write(contentOrigin);
-      out.close();
+      try (FSDataOutputStream out1 = fs.createFile(testFilePath)
+          .bufferSize(4096)
+          .replication((short) 1)
+          .blockSize(4096)
+          .build()) {
+        byte[] contentOrigin = content.getBytes("UTF8");
+        out1.write(contentOrigin);
+      }
 
       ContractTestUtils.verifyFileContents(fs, testFilePath,
           content.getBytes());
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
+
+      try (FSDataOutputStream out = fs.createFile(testFilePath).overwrite(false)
+        .build()) {
+        fail("it should fail to overwrite an existing file");
+      } catch (FileAlreadyExistsException e) {
+        // As expected, ignore.
+      }
+
+      Path nonParentFile = new Path("/parent/test");
+      try (FSDataOutputStream out = fs.createFile(nonParentFile).build()) {
+        fail("parent directory not exist");
+      } catch (FileNotFoundException e) {
+        // As expected.
+      }
+      assertFalse("parent directory should not be created",
+          fs.exists(new Path("/parent")));
+
+      try (FSDataOutputStream out = fs.createFile(nonParentFile).recursive()
+        .build()) {
+        out.write(1);
       }
+      assertTrue("parent directory has not been created",
+          fs.exists(new Path("/parent")));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
index 50e56cc..3352fd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
@@ -199,7 +199,7 @@ public class TestFavoredNodesEndToEnd {
       InetSocketAddress[] dns = getDatanodes(rand);
       Path p = new Path("/filename"+i);
       FSDataOutputStream out =
-          dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
+          dfs.createFile(p).favoredNodes(dns).build();
       out.write(SOME_BYTES);
       out.close();
       BlockLocation[] locations = getBlockLocations(p);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message