hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [05/14] hadoop git commit: HADOOP-14993. AliyunOSS: Override listFiles and listLocatedStatus. Contributed Genmao Yu
Date Wed, 15 Nov 2017 19:49:36 GMT
HADOOP-14993. AliyunOSS: Override listFiles and listLocatedStatus. Contributed Genmao Yu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/18621af7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/18621af7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/18621af7

Branch: refs/heads/YARN-6592
Commit: 18621af7ae8f8ed703245744f8f2a770d07bbfb9
Parents: 4f40cd3
Author: Kai Zheng <kai.zheng@intel.com>
Authored: Tue Nov 14 17:58:37 2017 +0800
Committer: Kai Zheng <kai.zheng@intel.com>
Committed: Tue Nov 14 17:58:37 2017 +0800

----------------------------------------------------------------------
 .../fs/aliyun/oss/AliyunOSSFileSystem.java      |  75 +++++++++--
 .../fs/aliyun/oss/AliyunOSSFileSystemStore.java | 106 ++++++++++++++++
 .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java    |  12 ++
 .../fs/aliyun/oss/FileStatusAcceptor.java       | 125 +++++++++++++++++++
 .../site/markdown/tools/hadoop-aliyun/index.md  |   6 +-
 5 files changed, 309 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/18621af7/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
index 3561b02..41d475d 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -28,14 +28,18 @@ import java.util.List;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 
@@ -46,6 +50,7 @@ import com.aliyun.oss.model.ObjectMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
 import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
 
 /**
@@ -60,6 +65,12 @@ public class AliyunOSSFileSystem extends FileSystem {
   private Path workingDir;
   private AliyunOSSFileSystemStore store;
   private int maxKeys;
+  private static final PathFilter DEFAULT_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path file) {
+      return true;
+    }
+  };
 
   @Override
   public FSDataOutputStream append(Path path, int bufferSize,
@@ -302,18 +313,6 @@ public class AliyunOSSFileSystem extends FileSystem {
   }
 
   /**
-   * Check if OSS object represents a directory.
-   *
-   * @param name object key
-   * @param size object content length
-   * @return true if object represents a directory
-   */
-  private boolean objectRepresentsDirectory(final String name,
-      final long size) {
-    return StringUtils.isNotEmpty(name) && name.endsWith("/") && size ==
0L;
-  }
-
-  /**
    * Turn a path (relative or otherwise) into an OSS key.
    *
    * @param path the path of the file.
@@ -404,6 +403,58 @@ public class AliyunOSSFileSystem extends FileSystem {
     return result.toArray(new FileStatus[result.size()]);
   }
 
+  @Override
+  public RemoteIterator<LocatedFileStatus> listFiles(
+      final Path f, final boolean recursive) throws IOException {
+    Path qualifiedPath = f.makeQualified(uri, workingDir);
+    final FileStatus status = getFileStatus(qualifiedPath);
+    PathFilter filter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return status.isFile() || !path.equals(f);
+      }
+    };
+    FileStatusAcceptor acceptor =
+        new FileStatusAcceptor.AcceptFilesOnly(qualifiedPath);
+    return innerList(f, status, filter, acceptor, recursive);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
+      throws IOException {
+    return listLocatedStatus(f, DEFAULT_FILTER);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
+      final PathFilter filter) throws IOException {
+    Path qualifiedPath = f.makeQualified(uri, workingDir);
+    final FileStatus status = getFileStatus(qualifiedPath);
+    FileStatusAcceptor acceptor =
+        new FileStatusAcceptor.AcceptAllButSelf(qualifiedPath);
+    return innerList(f, status, filter, acceptor, false);
+  }
+
+  private RemoteIterator<LocatedFileStatus> innerList(final Path f,
+      final FileStatus status,
+      final PathFilter filter,
+      final FileStatusAcceptor acceptor,
+      final boolean recursive) throws IOException {
+    Path qualifiedPath = f.makeQualified(uri, workingDir);
+    String key = pathToKey(qualifiedPath);
+
+    if (status.isFile()) {
+      LOG.debug("{} is a File", qualifiedPath);
+      final BlockLocation[] locations = getFileBlockLocations(status,
+        0, status.getLen());
+      return store.singleStatusRemoteIterator(filter.accept(f) ? status : null,
+        locations);
+    } else {
+      return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
+        acceptor, recursive ? null : "/");
+    }
+  }
+
   /**
    * Used to create an empty file that represents an empty directory.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18621af7/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
index a85a739..2e8edc7 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
@@ -46,7 +46,13 @@ import com.aliyun.oss.model.UploadPartResult;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +64,8 @@ import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
 
 import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
 
@@ -546,4 +554,102 @@ public class AliyunOSSFileSystemStore {
       LOG.error("Failed to purge " + prefix);
     }
   }
+
+  public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator(
+      final FileStatus fileStatus, final BlockLocation[] locations) {
+    return new RemoteIterator<LocatedFileStatus>() {
+      private boolean hasNext = true;
+      @Override
+      public boolean hasNext() throws IOException {
+        return fileStatus != null && hasNext;
+      }
+
+      @Override
+      public LocatedFileStatus next() throws IOException {
+        if (hasNext()) {
+          LocatedFileStatus s = new LocatedFileStatus(fileStatus,
+              fileStatus.isFile() ? locations : null);
+          hasNext = false;
+          return s;
+        } else {
+          throw new NoSuchElementException();
+        }
+      }
+    };
+  }
+
+  public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator(
+      final String prefix, final int maxListingLength, FileSystem fs,
+      PathFilter filter, FileStatusAcceptor acceptor, String delimiter) {
+    return new RemoteIterator<LocatedFileStatus>() {
+      private String nextMarker = null;
+      private boolean firstListing = true;
+      private boolean meetEnd = false;
+      private ListIterator<FileStatus> batchIterator;
+
+      @Override
+      public boolean hasNext() throws IOException {
+        if (firstListing) {
+          requestNextBatch();
+          firstListing = false;
+        }
+        return batchIterator.hasNext() || requestNextBatch();
+      }
+
+      @Override
+      public LocatedFileStatus next() throws IOException {
+        if (hasNext()) {
+          FileStatus status = batchIterator.next();
+          BlockLocation[] locations = fs.getFileBlockLocations(status,
+            0, status.getLen());
+          return new LocatedFileStatus(
+              status, status.isFile() ? locations : null);
+        } else {
+          throw new NoSuchElementException();
+        }
+      }
+
+      private boolean requestNextBatch() {
+        if (meetEnd) {
+          return false;
+        }
+        ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
+        listRequest.setPrefix(AliyunOSSUtils.maybeAddTrailingSlash(prefix));
+        listRequest.setMaxKeys(maxListingLength);
+        listRequest.setMarker(nextMarker);
+        listRequest.setDelimiter(delimiter);
+        ObjectListing listing = ossClient.listObjects(listRequest);
+        List<FileStatus> stats = new ArrayList<>(
+            listing.getObjectSummaries().size() +
+            listing.getCommonPrefixes().size());
+        for(OSSObjectSummary summary: listing.getObjectSummaries()) {
+          String key = summary.getKey();
+          Path path = fs.makeQualified(new Path("/" + key));
+          if (filter.accept(path) && acceptor.accept(path, summary)) {
+            FileStatus status = new FileStatus(summary.getSize(),
+                key.endsWith("/"), 1, fs.getDefaultBlockSize(path),
+                summary.getLastModified().getTime(), path);
+            stats.add(status);
+          }
+        }
+
+        for(String commonPrefix: listing.getCommonPrefixes()) {
+          Path path = fs.makeQualified(new Path("/" + commonPrefix));
+          if (filter.accept(path) && acceptor.accept(path, commonPrefix)) {
+            FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
+            stats.add(status);
+          }
+        }
+
+        batchIterator = stats.listIterator();
+        if (listing.isTruncated()) {
+          nextMarker = listing.getNextMarker();
+        } else {
+          meetEnd = true;
+        }
+        statistics.incrementReadOps(1);
+        return batchIterator.hasNext();
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18621af7/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
index cae9749..fdf72e4 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
@@ -164,4 +164,16 @@ final public class AliyunOSSUtils {
       return key;
     }
   }
+
+  /**
+   * Check if OSS object represents a directory.
+   *
+   * @param name object key
+   * @param size object content length
+   * @return true if object represents a directory
+   */
+  public static boolean objectRepresentsDirectory(final String name,
+      final long size) {
+    return StringUtils.isNotEmpty(name) && name.endsWith("/") && size ==
0L;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18621af7/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/FileStatusAcceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/FileStatusAcceptor.java
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/FileStatusAcceptor.java
new file mode 100644
index 0000000..fd01047
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/FileStatusAcceptor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import com.aliyun.oss.model.OSSObjectSummary;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
+
+/**
+ * Interface to implement by the logic deciding whether to accept a summary
+ * entry or path as a valid file or directory.
+ */
+public interface FileStatusAcceptor {
+
+  /**
+   * Predicate to decide whether or not to accept a summary entry.
+   * @param keyPath qualified path to the entry
+   * @param summary summary entry
+   * @return true if the entry is accepted (i.e. that a status entry
+   * should be generated.
+   */
+  boolean accept(Path keyPath, OSSObjectSummary summary);
+
+  /**
+   * Predicate to decide whether or not to accept a prefix.
+   * @param keyPath qualified path to the entry
+   * @param commonPrefix the prefix
+   * @return true if the entry is accepted (i.e. that a status entry
+   * should be generated.)
+   */
+  boolean accept(Path keyPath, String commonPrefix);
+
+  /**
+   * Accept all entries except the base path.
+   */
+  class AcceptFilesOnly implements FileStatusAcceptor {
+    private final Path qualifiedPath;
+
+    public AcceptFilesOnly(Path qualifiedPath) {
+      this.qualifiedPath = qualifiedPath;
+    }
+
+    /**
+     * Reject a summary entry if the key path is the qualified Path.
+     * @param keyPath key path of the entry
+     * @param summary summary entry
+     * @return true if the entry is accepted (i.e. that a status entry
+     * should be generated.
+     */
+    @Override
+    public boolean accept(Path keyPath, OSSObjectSummary summary) {
+      return !keyPath.equals(qualifiedPath)
+        && !objectRepresentsDirectory(summary.getKey(), summary.getSize());
+    }
+
+    /**
+     * Accept no directory paths.
+     * @param keyPath qualified path to the entry
+     * @param prefix common prefix in listing.
+     * @return false, always.
+     */
+    @Override
+    public boolean accept(Path keyPath, String prefix) {
+      return false;
+    }
+  }
+
+  /**
+   * Accept all entries except the base path.
+   */
+  class AcceptAllButSelf implements FileStatusAcceptor {
+
+    /** Base path. */
+    private final Path qualifiedPath;
+
+    /**
+     * Constructor.
+     * @param qualifiedPath an already-qualified path.
+     */
+    public AcceptAllButSelf(Path qualifiedPath) {
+      this.qualifiedPath = qualifiedPath;
+    }
+
+    /**
+     * Reject a summary entry if the key path is the qualified Path.
+     * @param keyPath key path of the entry
+     * @param summary summary entry
+     * @return true if the entry is accepted (i.e. that a status entry
+     * should be generated.)
+     */
+    @Override
+    public boolean accept(Path keyPath, OSSObjectSummary summary) {
+      return !keyPath.equals(qualifiedPath);
+    }
+
+    /**
+     * Accept all prefixes except the one for the base path, "self".
+     * @param keyPath qualified path to the entry
+     * @param prefix common prefix in listing.
+     * @return true if the entry is accepted (i.e. that a status entry
+     * should be generated.
+     */
+    @Override
+    public boolean accept(Path keyPath, String prefix) {
+      return !keyPath.equals(qualifiedPath);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/18621af7/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
index 88c83b5..2913279 100644
--- a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
+++ b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
@@ -56,14 +56,14 @@ Authorization occurs at the level of the entire Aliyun account via
 4. The append operation is not supported.
 
 ### Warning #2: Directory last access time is not tracked,
-features of Hadoop relying on this can have unexpected behaviour. E.g. the
-AggregatedLogDeletionService of YARN will not remove the appropriate logfiles.
+Features of Hadoop relying on this can have unexpected behaviour. E.g. the
+AggregatedLogDeletionService of YARN will not remove the appropriate log files.
 
 ### Warning #3: Your Aliyun credentials are valuable
 
 Your Aliyun credentials not only pay for services, they offer read and write
 access to the data. Anyone with the account can not only read your datasets
-—they can delete them.
+— they can delete them.
 
 Do not inadvertently share these credentials through means such as
 1. Checking in to SCM any configuration files containing the secrets.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message