hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dran...@apache.org
Subject [2/2] hadoop git commit: HADOOP-12756. Incorporate Aliyun OSS file system implementation. Contributed by Mingfei Shi and Lin Zhou
Date Thu, 04 Aug 2016 13:21:53 GMT
HADOOP-12756. Incorporate Aliyun OSS file system implementation. Contributed by Mingfei Shi and Lin Zhou


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8346f922
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8346f922
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8346f922

Branch: refs/heads/HADOOP-12756
Commit: 8346f9222ab7fc77ff79edaeda4dbca16b5fa76f
Parents: 70c2781
Author: Kai Zheng <kai.zheng@intel.com>
Authored: Thu Aug 4 21:21:10 2016 +0800
Committer: Kai Zheng <kai.zheng@intel.com>
Committed: Thu Aug 4 21:21:10 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      |   2 +
 hadoop-project/pom.xml                          |  22 +
 .../dev-support/findbugs-exclude.xml            |  18 +
 hadoop-tools/hadoop-aliyun/pom.xml              | 133 +++
 .../fs/aliyun/oss/AliyunOSSFileSystem.java      | 847 +++++++++++++++++++
 .../fs/aliyun/oss/AliyunOSSInputStream.java     | 268 ++++++
 .../fs/aliyun/oss/AliyunOSSOutputStream.java    | 219 +++++
 .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java    | 151 ++++
 .../apache/hadoop/fs/aliyun/oss/Constants.java  | 110 +++
 .../hadoop/fs/aliyun/oss/package-info.java      |  22 +
 .../hadoop/fs/aliyun/oss/OSSTestUtils.java      |  80 ++
 .../aliyun/oss/TestOSSFileSystemContract.java   | 253 ++++++
 .../fs/aliyun/oss/TestOSSInputStream.java       | 141 +++
 .../fs/aliyun/oss/TestOSSOutputStream.java      |  71 ++
 .../fs/aliyun/oss/contract/OSSContract.java     |  54 ++
 .../oss/contract/TestOSSContractCreate.java     |  41 +
 .../oss/contract/TestOSSContractDelete.java     |  34 +
 .../oss/contract/TestOSSContractMkdir.java      |  34 +
 .../oss/contract/TestOSSContractOpen.java       |  34 +
 .../oss/contract/TestOSSContractRename.java     |  35 +
 .../oss/contract/TestOSSContractSeek.java       |  34 +
 .../src/test/resources/contract/oss.xml         | 105 +++
 .../src/test/resources/core-site.xml            |  46 +
 .../src/test/resources/log4j.properties         |  23 +
 hadoop-tools/hadoop-tools-dist/pom.xml          |   6 +
 hadoop-tools/pom.xml                            |   1 +
 26 files changed, 2784 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 998287d..070b14e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,3 +26,5 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml
 hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml
 hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml
 patchprocess/
+hadoop-tools/hadoop-aliyun/src/test/resources/auth-keys.xml
+hadoop-tools/hadoop-aliyun/src/test/resources/contract-test-options.xml

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index dee79f7..5b5b59a 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -438,6 +438,12 @@
 
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-aliyun</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-kms</artifactId>
         <version>${project.version}</version>
         <classifier>classes</classifier>
@@ -999,6 +1005,22 @@
         <version>4.2.0</version>
      </dependency>
 
+      <dependency>
+        <groupId>com.aliyun.oss</groupId>
+        <artifactId>aliyun-sdk-oss</artifactId>
+        <version>2.2.1</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+          </exclusion>
+        </exclusions>
+     </dependency>
+
      <dependency>
        <groupId>xerces</groupId>
        <artifactId>xercesImpl</artifactId>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
new file mode 100644
index 0000000..40d78d0
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
@@ -0,0 +1,18 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml
new file mode 100644
index 0000000..c87d13f
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project</artifactId>
+    <version>3.0.0-alpha2-SNAPSHOT</version>
+    <relativePath>../../hadoop-project</relativePath>
+  </parent>
+  <artifactId>hadoop-aliyun</artifactId>
+  <name>Apache Hadoop Aliyun OSS support</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <file.encoding>UTF-8</file.encoding>
+    <downloadSources>true</downloadSources>
+  </properties>
+
+  <profiles>
+    <profile>
+      <id>tests-off</id>
+      <activation>
+        <file>
+          <missing>src/test/resources/auth-keys.xml</missing>
+        </file>
+      </activation>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+    <profile>
+      <id>tests-on</id>
+      <activation>
+        <file>
+          <exists>src/test/resources/auth-keys.xml</exists>
+        </file>
+      </activation>
+      <properties>
+        <maven.test.skip>false</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <findbugsXmlOutput>true</findbugsXmlOutput>
+          <xmlOutput>true</xmlOutput>
+          <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
+          </excludeFilterFile>
+          <effort>Max</effort>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-project-info-reports-plugin</artifactId>
+        <configuration>
+          <dependencyDetailsEnabled>false</dependencyDetailsEnabled>
+          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>deplist</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>list</goal>
+            </goals>
+            <configuration>
+              <!-- build a shellprofile -->
+              <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.aliyun.oss</groupId>
+      <artifactId>aliyun-sdk-oss</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
new file mode 100644
index 0000000..30ddf8c
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -0,0 +1,847 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
+import com.aliyun.oss.common.auth.DefaultCredentials;
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.UserInfo;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.util.Progressable;
+
+import com.aliyun.oss.ClientConfiguration;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.model.AbortMultipartUploadRequest;
+import com.aliyun.oss.model.CannedAccessControlList;
+import com.aliyun.oss.model.CompleteMultipartUploadRequest;
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.CopyObjectResult;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.InitiateMultipartUploadRequest;
+import com.aliyun.oss.model.InitiateMultipartUploadResult;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PartETag;
+import com.aliyun.oss.model.UploadPartCopyRequest;
+import com.aliyun.oss.model.UploadPartCopyResult;
+
+/**
+ * Implementation of {@link FileSystem} for <a href="https://oss.aliyun.com">
+ * Aliyun OSS</a>, used to access OSS blob system in a filesystem style.
+ */
+public class AliyunOSSFileSystem extends FileSystem {
+
+  private URI uri;
+  private Path workingDir;
+  private OSSClient ossClient;
+  private String bucketName;
+  private long uploadPartSize;
+  private long multipartThreshold;
+  private int maxKeys;
+  private String serverSideEncryptionAlgorithm;
+
+  @Override
+  public FSDataOutputStream append(Path path, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new IOException("Append is not supported!");
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (ossClient != null) {
+        ossClient.shutdown();
+      }
+    } finally {
+      super.close();
+    }
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    String key = pathToKey(path);
+
+    if (!overwrite && exists(path)) {
+      throw new FileAlreadyExistsException(path + " already exists");
+    }
+
+    return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(),
+        ossClient, bucketName, key, progress, statistics,
+        serverSideEncryptionAlgorithm), (Statistics)(null));
+  }
+
+  @Override
+  public boolean delete(Path path, boolean recursive) throws IOException {
+    FileStatus status;
+    try {
+      status = getFileStatus(path);
+    } catch (FileNotFoundException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Couldn't delete " + path + ": Does not exist!");
+      }
+      return false;
+    }
+
+    String key = pathToKey(status.getPath());
+    if (status.isDirectory()) {
+      if (!key.endsWith("/")) {
+        key += "/";
+      }
+      if (!recursive) {
+        FileStatus[] statuses = listStatus(status.getPath());
+        // Check whether it is an empty directory or not
+        if (statuses.length > 0) {
+          throw new IOException("Cannot remove directory" + path +
+              ": It is not empty!");
+        } else {
+          // Delete empty directory without '-r'
+          ossClient.deleteObject(bucketName, key);
+          statistics.incrementWriteOps(1);
+        }
+      } else {
+        ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
+        listRequest.setPrefix(key);
+        listRequest.setMaxKeys(maxKeys);
+
+        while (true) {
+          ObjectListing objects = ossClient.listObjects(listRequest);
+          statistics.incrementReadOps(1);
+          List<String> keysToDelete = new ArrayList<String>();
+          for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
+            keysToDelete.add(objectSummary.getKey());
+          }
+          DeleteObjectsRequest deleteRequest =
+              new DeleteObjectsRequest(bucketName);
+          deleteRequest.setKeys(keysToDelete);
+          ossClient.deleteObjects(deleteRequest);
+          statistics.incrementWriteOps(1);
+          if (objects.isTruncated()) {
+            listRequest.setMarker(objects.getNextMarker());
+          } else {
+            break;
+          }
+        }
+      }
+    } else {
+      ossClient.deleteObject(bucketName, key);
+      statistics.incrementWriteOps(1);
+    }
+    //TODO: optimize logic here
+    try {
+      Path pPath = status.getPath().getParent();
+      FileStatus pStatus = getFileStatus(pPath);
+      if (pStatus.isDirectory()) {
+        return true;
+      } else {
+        throw new IOException("Path " + pPath +
+            " is assumed to be a directory!");
+      }
+    } catch (FileNotFoundException fnfe) {
+      // Make sure the parent directory exists
+      return mkdir(bucketName, pathToKey(status.getPath().getParent()));
+    }
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    Path qualifiedPath = path.makeQualified(uri, workingDir);
+    String key = pathToKey(qualifiedPath);
+
+    // Root always exists
+    if (key.length() == 0) {
+      return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
+    }
+
+    ObjectMetadata meta = getObjectMetadata(key);
+    // If key not found and key does not end with "/"
+    if (meta == null && !key.endsWith("/")) {
+      // Case: dir + "/"
+      key += "/";
+      meta = getObjectMetadata(key);
+    }
+    if (meta == null) {
+      // Case: dir + "/" + file
+      ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
+      listRequest.setPrefix(key);
+      listRequest.setDelimiter("/");
+      listRequest.setMaxKeys(1);
+
+      ObjectListing listing = ossClient.listObjects(listRequest);
+      statistics.incrementReadOps(1);
+      if (!listing.getObjectSummaries().isEmpty() ||
+          !listing.getCommonPrefixes().isEmpty()) {
+        return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
+      } else {
+        throw new FileNotFoundException(path + ": No such file or directory!");
+      }
+    } else if (objectRepresentsDirectory(key, meta.getContentLength())) {
+      return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
+    } else {
+      return new FileStatus(meta.getContentLength(), false, 1,
+          getDefaultBlockSize(path), meta.getLastModified().getTime(),
+          qualifiedPath);
+    }
+  }
+
+  /**
+   * Return object metadata given object key.
+   *
+   * @param key object key
+   * @return return null if key does not exist
+   */
+  private ObjectMetadata getObjectMetadata(String key) {
+    try {
+      return ossClient.getObjectMetadata(bucketName, key);
+    } catch (OSSException osse) {
+      return null;
+    } finally {
+      statistics.incrementReadOps(1);
+    }
+  }
+
+  @Override
+  public String getScheme() {
+    return "oss";
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Deprecated
+  public long getDefaultBlockSize() {
+    return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT);
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    // Does not support Token
+    return null;
+  }
+
+  /**
+   * Initialize new FileSystem.
+   *
+   * @param name the uri of the file system, including host, port, etc.
+   *
+   * @param conf configuration of the file system
+   * @throws IOException IO problems
+   */
+  public void initialize(URI name, Configuration conf) throws IOException {
+    super.initialize(name, conf);
+
+    uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
+    workingDir =
+        new Path("/user",
+            System.getProperty("user.name")).makeQualified(uri, null);
+
+    bucketName = name.getHost();
+
+    ClientConfiguration clientConf = new ClientConfiguration();
+    clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
+        MAXIMUM_CONNECTIONS_DEFAULT));
+    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
+        SECURE_CONNECTIONS_DEFAULT);
+    clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+    clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
+        MAX_ERROR_RETRIES_DEFAULT));
+    clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
+        ESTABLISH_TIMEOUT_DEFAULT));
+    clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
+        SOCKET_TIMEOUT_DEFAULT));
+
+    String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
+    int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
+    if (!proxyHost.isEmpty()) {
+      clientConf.setProxyHost(proxyHost);
+      if (proxyPort >= 0) {
+        clientConf.setProxyPort(proxyPort);
+      } else {
+        if (secureConnections) {
+          LOG.warn("Proxy host set without port. Using HTTPS default 443");
+          clientConf.setProxyPort(443);
+        } else {
+          LOG.warn("Proxy host set without port. Using HTTP default 80");
+          clientConf.setProxyPort(80);
+        }
+      }
+      String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
+      String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
+      if ((proxyUsername == null) != (proxyPassword == null)) {
+        String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
+            PROXY_PASSWORD_KEY + " set without the other.";
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      clientConf.setProxyUsername(proxyUsername);
+      clientConf.setProxyPassword(proxyPassword);
+      clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
+      clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
+    } else if (proxyPort >= 0) {
+      String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
+          PROXY_HOST_KEY;
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+
+    String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
+    ossClient =
+        new OSSClient(endPoint, getCredentialsProvider(name, conf), clientConf);
+
+    maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
+    uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
+        MULTIPART_UPLOAD_SIZE_DEFAULT);
+    multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
+        MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
+
+    if (uploadPartSize < 5 * 1024 * 1024) {
+      LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
+      uploadPartSize = 5 * 1024 * 1024;
+    }
+
+    if (multipartThreshold < 5 * 1024 * 1024) {
+      LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
+      multipartThreshold = 5 * 1024 * 1024;
+    }
+
+    if (multipartThreshold > 1024 * 1024 * 1024) {
+      LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
+      multipartThreshold = 1024 * 1024 * 1024;
+    }
+
+    String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
+    if (!cannedACLName.isEmpty()) {
+      CannedAccessControlList cannedACL =
+          CannedAccessControlList.valueOf(cannedACLName);
+      ossClient.setBucketAcl(bucketName, cannedACL);
+    }
+
+    serverSideEncryptionAlgorithm =
+        conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
+
+    setConf(conf);
+  }
+
+  /**
+   * Create the default credential provider, or load in one explicitly
+   * identified in the configuration.
+   * @param name the uri of the file system
+   * @param conf configuration
+   * @return a credential provider
+   * @throws IOException on any problem. Class construction issues may be
+   * nested inside the IOE.
+   */
+  private CredentialsProvider getCredentialsProvider(URI name,
+      Configuration conf) throws IOException {
+    CredentialsProvider credentials;
+
+    String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
+    if (StringUtils.isEmpty(className)) {
+      Configuration newConf =
+          ProviderUtils.excludeIncompatibleCredentialProviders(conf,
+              AliyunOSSFileSystem.class);
+      String accessKey =
+          AliyunOSSUtils.getPassword(newConf, ACCESS_KEY,
+              UserInfo.EMPTY.getUser());
+      String secretKey =
+          AliyunOSSUtils.getPassword(newConf, SECRET_KEY,
+              UserInfo.EMPTY.getPassword());
+      credentials =
+          new DefaultCredentialProvider(
+              new DefaultCredentials(accessKey, secretKey));
+
+    } else {
+      try {
+        LOG.debug("Credential provider class is:" + className);
+        Class<?> credClass = Class.forName(className);
+        try {
+          credentials =
+              (CredentialsProvider)credClass.getDeclaredConstructor(
+                  URI.class, Configuration.class).newInstance(this.uri, conf);
+        } catch (NoSuchMethodException | SecurityException e) {
+          credentials =
+              (CredentialsProvider)credClass.getDeclaredConstructor()
+              .newInstance();
+        }
+      } catch (ClassNotFoundException e) {
+        throw new IOException(className + " not found.", e);
+      } catch (NoSuchMethodException | SecurityException e) {
+        throw new IOException(String.format("%s constructor exception.  A " +
+            "class specified in %s must provide an accessible constructor " +
+            "accepting URI and Configuration, or an accessible default " +
+            "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), e);
+      } catch (ReflectiveOperationException | IllegalArgumentException e) {
+        throw new IOException(className + " instantiation exception.", e);
+      }
+    }
+
+    return credentials;
+  }
+
+  /**
+   * Check if OSS object represents a directory.
+   *
+   * @param name object key
+   * @param size object content length
+   * @return true if object represents a directory
+   */
+  private boolean objectRepresentsDirectory(final String name,
+      final long size) {
+    return !name.isEmpty() && name.endsWith("/") && size == 0L;
+  }
+
+  /**
+   * Turns a path (relative or otherwise) into an OSS key.
+   *
+   * @param path the path of the file
+   * @return the key of the object that represent the file
+   */
+  private String pathToKey(Path path) {
+    if (!path.isAbsolute()) {
+      path = new Path(workingDir, path);
+    }
+
+    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
+      return "";
+    }
+
+    return path.toUri().getPath().substring(1);
+  }
+
+  private Path keyToPath(String key) {
+    return new Path("/" + key);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws IOException {
+    String key = pathToKey(path);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("List status for path: " + path);
+    }
+
+    final List<FileStatus> result = new ArrayList<FileStatus>();
+    final FileStatus fileStatus = getFileStatus(path);
+
+    if (fileStatus.isDirectory()) {
+      if (!key.endsWith("/")) {
+        key = key + "/";
+      }
+
+      ListObjectsRequest listObjectsRequest =
+          new ListObjectsRequest(bucketName);
+      listObjectsRequest.setPrefix(key);
+      listObjectsRequest.setDelimiter("/");
+      listObjectsRequest.setMaxKeys(maxKeys);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("listStatus: doing listObjects for directory " + key);
+      }
+
+      while (true) {
+        ObjectListing objects = ossClient.listObjects(listObjectsRequest);
+        statistics.incrementReadOps(1);
+        for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
+          Path keyPath = keyToPath(objectSummary.getKey())
+              .makeQualified(uri, workingDir);
+          if (keyPath.equals(path)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Ignoring: " + keyPath);
+            }
+            continue;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding: fi: " + keyPath);
+            }
+            result.add(new FileStatus(objectSummary.getSize(), false, 1,
+                getDefaultBlockSize(keyPath),
+                objectSummary.getLastModified().getTime(), keyPath));
+          }
+        }
+
+        for (String prefix : objects.getCommonPrefixes()) {
+          Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
+          if (keyPath.equals(path)) {
+            continue;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding: rd: " + keyPath);
+            }
+            result.add(new FileStatus(0, true, 1, 0, 0, keyPath));
+          }
+        }
+
+        if (objects.isTruncated()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("listStatus: list truncated - getting next batch");
+          }
+          listObjectsRequest.setMarker(objects.getNextMarker());
+          statistics.incrementReadOps(1);
+        } else {
+          break;
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding: rd (not a dir): " + path);
+      }
+      result.add(fileStatus);
+    }
+
+    return result.toArray(new FileStatus[result.size()]);
+  }
+
+  /**
+   * Used to create an empty file that represents an empty directory.
+   *
+   * @param bucketName the bucket this directory belongs to
+   * @param objectName directory path
+   * @return true if directory successfully created
+   * @throws IOException
+   */
+  private boolean mkdir(final String bucket, final String objectName)
+      throws IOException {
+    String dirName = objectName;
+    ObjectMetadata dirMeta = new ObjectMetadata();
+    byte[] buffer = new byte[0];
+    ByteArrayInputStream in = new ByteArrayInputStream(buffer);
+    dirMeta.setContentLength(0);
+    if (!objectName.endsWith("/")) {
+      dirName += "/";
+    }
+    try {
+      ossClient.putObject(bucket, dirName, in, dirMeta);
+      return true;
+    } finally {
+      in.close();
+    }
+  }
+
+  @Override
+  public boolean mkdirs(Path path, FsPermission permission)
+      throws IOException {
+    try {
+      FileStatus fileStatus = getFileStatus(path);
+
+      if (fileStatus.isDirectory()) {
+        return true;
+      } else {
+        throw new FileAlreadyExistsException("Path is a file: " + path);
+      }
+    } catch (FileNotFoundException e) {
+      validatePath(path);
+      String key = pathToKey(path);
+      return mkdir(bucketName, key);
+    }
+  }
+
+  /**
+   * Check whether the path is a valid path.
+   *
+   * @param path the path to be checked
+   * @throws IOException
+   */
+  private void validatePath(Path path) throws IOException {
+    Path fPart = path.getParent();
+    do {
+      try {
+        FileStatus fileStatus = getFileStatus(fPart);
+        if (fileStatus.isDirectory()) {
+          // If path exists and a directory, exit
+          break;
+        } else {
+          throw new FileAlreadyExistsException(String.format(
+              "Can't make directory for path '%s', it is a file.", fPart));
+        }
+      } catch (FileNotFoundException fnfe) {
+      }
+      fPart = fPart.getParent();
+    } while (fPart != null);
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    final FileStatus fileStatus = getFileStatus(path);
+    if (fileStatus.isDirectory()) {
+      throw new FileNotFoundException("Can't open " + path +
+          " because it is a directory");
+    }
+
+    return new FSDataInputStream(new AliyunOSSInputStream(getConf(), ossClient,
+        bucketName, pathToKey(path), fileStatus.getLen(), statistics));
+  }
+
+  @Override
+  public boolean rename(Path srcPath, Path dstPath) throws IOException {
+    if (srcPath.isRoot()) {
+      // Cannot rename root of file system
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cannot rename the root of a filesystem");
+      }
+      return false;
+    }
+    Path parent = dstPath.getParent();
+    while (parent != null && !srcPath.equals(parent)) {
+      parent = parent.getParent();
+    }
+    if (parent != null) {
+      return false;
+    }
+    FileStatus srcStatus = getFileStatus(srcPath);
+    FileStatus dstStatus;
+    try {
+      dstStatus = getFileStatus(dstPath);
+    } catch (FileNotFoundException fnde) {
+      dstStatus = null;
+    }
+    if (dstStatus == null) {
+      // If dst doesn't exist, check whether dst dir exists or not
+      dstStatus = getFileStatus(dstPath.getParent());
+      if (!dstStatus.isDirectory()) {
+        throw new IOException(String.format(
+            "Failed to rename %s to %s, %s is a file", srcPath, dstPath,
+            dstPath.getParent()));
+      }
+    } else {
+      if (srcStatus.getPath().equals(dstStatus.getPath())) {
+        return !srcStatus.isDirectory();
+      } else if (dstStatus.isDirectory()) {
+        // If dst is a directory
+        dstPath = new Path(dstPath, srcPath.getName());
+        FileStatus[] statuses;
+        try {
+          statuses = listStatus(dstPath);
+        } catch (FileNotFoundException fnde) {
+          statuses = null;
+        }
+        if (statuses != null && statuses.length > 0) {
+          // If dst exists and not a directory / not empty
+          throw new FileAlreadyExistsException(String.format(
+              "Failed to rename %s to %s, file already exists or not empty!",
+              srcPath, dstPath));
+        }
+      } else {
+        // If dst is not a directory
+        throw new FileAlreadyExistsException(String.format(
+            "Failed to rename %s to %s, file already exists!", srcPath,
+            dstPath));
+      }
+    }
+    if (srcStatus.isDirectory()) {
+      copyDirectory(srcPath, dstPath);
+    } else {
+      copyFile(srcPath, dstPath);
+    }
+    if (srcPath.equals(dstPath)) {
+      return true;
+    } else {
+      return delete(srcPath, true);
+    }
+  }
+
+  /**
+   * Copy file from source path to destination path.
+   * (the caller should make sure srcPath is a file and dstPath is valid.)
+   *
+   * @param srcPath source path
+   * @param dstPath destination path
+   * @return true if successfully copied
+   */
+  private boolean copyFile(Path srcPath, Path dstPath) {
+    String srcKey = pathToKey(srcPath);
+    String dstKey = pathToKey(dstPath);
+    return copyFile(srcKey, dstKey);
+  }
+
+  /**
+   * Copy an object from source key to destination key.
+   *
+   * @param srcKey source key
+   * @param dstKey destination key
+   * @return true if successfully copied
+   */
+  private boolean copyFile(String srcKey, String dstKey) {
+    ObjectMetadata objectMeta =
+        ossClient.getObjectMetadata(bucketName, srcKey);
+    long dataLen = objectMeta.getContentLength();
+    if (dataLen <= multipartThreshold) {
+      return singleCopy(srcKey, dstKey);
+    } else {
+      return multipartCopy(srcKey, dataLen, dstKey);
+    }
+  }
+
+  /**
+   * Use single copy to copy an oss object.
+   *
+   * @param srcKey source key
+   * @param dstKey destination key
+   * @return true if successfully copied
+   * (the caller should make sure srcPath is a file and dstPath is valid)
+   */
+  private boolean singleCopy(String srcKey, String dstKey) {
+    CopyObjectResult copyResult =
+        ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
+    LOG.debug(copyResult.getETag());
+    return true;
+  }
+
+  /**
+   * Use multipart copy to copy an oss object.
+   * (the caller should make sure srcPath is a file and dstPath is valid)
+   *
+   * @param srcKey source key
+   * @param dataLen data size of the object to copy
+   * @param dstKey destination key
+   * @return true if successfully copied, or false if upload is aborted
+   */
+  private boolean multipartCopy(String srcKey, long dataLen, String dstKey) {
+    int partNum = (int)(dataLen / uploadPartSize);
+    if (dataLen % uploadPartSize != 0) {
+      partNum++;
+    }
+    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
+        new InitiateMultipartUploadRequest(bucketName, dstKey);
+    ObjectMetadata meta = new ObjectMetadata();
+    if (!serverSideEncryptionAlgorithm.isEmpty()) {
+      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    initiateMultipartUploadRequest.setObjectMetadata(meta);
+    InitiateMultipartUploadResult initiateMultipartUploadResult =
+        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
+    String uploadId = initiateMultipartUploadResult.getUploadId();
+    List<PartETag> partETags = new ArrayList<PartETag>();
+    try {
+      for (int i = 0; i < partNum; i++) {
+        long skipBytes = uploadPartSize * i;
+        long size = (uploadPartSize < dataLen - skipBytes) ?
+            uploadPartSize : dataLen - skipBytes;
+        UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
+        partCopyRequest.setSourceBucketName(bucketName);
+        partCopyRequest.setSourceKey(srcKey);
+        partCopyRequest.setBucketName(bucketName);
+        partCopyRequest.setKey(dstKey);
+        partCopyRequest.setUploadId(uploadId);
+        partCopyRequest.setPartSize(size);
+        partCopyRequest.setBeginIndex(skipBytes);
+        partCopyRequest.setPartNumber(i + 1);
+        UploadPartCopyResult partCopyResult =
+            ossClient.uploadPartCopy(partCopyRequest);
+        statistics.incrementWriteOps(1);
+        partETags.add(partCopyResult.getPartETag());
+      }
+      CompleteMultipartUploadRequest completeMultipartUploadRequest =
+          new CompleteMultipartUploadRequest(bucketName, dstKey,
+          uploadId, partETags);
+      CompleteMultipartUploadResult completeMultipartUploadResult =
+          ossClient.completeMultipartUpload(completeMultipartUploadRequest);
+      LOG.debug(completeMultipartUploadResult.getETag());
+      return true;
+    } catch (Exception e) {
+      AbortMultipartUploadRequest abortMultipartUploadRequest =
+          new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
+      ossClient.abortMultipartUpload(abortMultipartUploadRequest);
+      return false;
+    }
+  }
+
+  /**
+   * Copy a directory from source path to destination path.
+   * (the caller should make sure srcPath is a directory, and dstPath is valid)
+   *
+   * @param srcPath source path
+   * @param dstPath destination path
+   * @return true if successfully copied
+   */
+  private boolean copyDirectory(Path srcPath, Path dstPath) {
+    String srcKey = pathToKey(srcPath);
+    String dstKey = pathToKey(dstPath);
+
+    if (!srcKey.endsWith("/")) {
+      srcKey = srcKey + "/";
+    }
+    if (!dstKey.endsWith("/")) {
+      dstKey = dstKey + "/";
+    }
+
+    if (dstKey.startsWith(srcKey)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cannot rename a directory to a subdirectory of self");
+      }
+      return false;
+    }
+
+    ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName);
+    listObjectsRequest.setPrefix(srcKey);
+    listObjectsRequest.setMaxKeys(maxKeys);
+
+    ObjectListing objects = ossClient.listObjects(listObjectsRequest);
+    statistics.incrementReadOps(1);
+    // Copy files from src folder to dst
+    while (true) {
+      for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
+        String newKey =
+            dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
+        copyFile(objectSummary.getKey(), newKey);
+      }
+      if (objects.isTruncated()) {
+        listObjectsRequest.setMarker(objects.getNextMarker());
+        statistics.incrementReadOps(1);
+      } else {
+        break;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void setWorkingDirectory(Path dir) {
+    this.workingDir = dir;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
new file mode 100644
index 0000000..bcd00dc
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.model.GetObjectRequest;
+
+/**
+ * The input stream for OSS blob system.
+ * The class uses multi-part downloading to read data from the object content
+ * stream.
+ */
+public class AliyunOSSInputStream extends FSInputStream {
+  public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class);
+  private static final int MAX_RETRIES = 10;
+  private final long downloadPartSize;
+
+  private String bucketName;
+  private String key;
+  private OSSClient ossClient;
+  private Statistics statistics;
+  private boolean closed;
+  private InputStream wrappedStream = null;
+  private long dataLen;
+  private long position;
+  private long partRemaining;
+
+  public AliyunOSSInputStream(Configuration conf, OSSClient client,
+      String bucketName, String key, Long dataLen, Statistics statistics)
+      throws IOException {
+    this.bucketName = bucketName;
+    this.key = key;
+    ossClient = client;
+    this.statistics = statistics;
+    this.dataLen = dataLen;
+    downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
+        MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+    reopen(0);
+    closed = false;
+  }
+
+  /**
+   * Reopen the wrapped stream at give position, by seeking for
+   * data of a part length from object content stream.
+   *
+   * @param pos position from start of a file
+   * @throws IOException if failed to reopen
+   */
+  private synchronized void reopen(long pos) throws IOException {
+
+    long partLen;
+
+    if (pos < 0) {
+      throw new EOFException("Cannot seek at negtive position:" + pos);
+    } else if (pos > dataLen) {
+      throw new EOFException("Cannot seek after EOF, fileLen:" + dataLen +
+          " position:" + pos);
+    } else if (pos + downloadPartSize > dataLen) {
+      partLen = dataLen - pos;
+    } else {
+      partLen = downloadPartSize;
+    }
+
+    if (wrappedStream != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Aborting old stream to open at pos " + pos);
+      }
+      wrappedStream.close();
+    }
+
+    GetObjectRequest request = new GetObjectRequest(bucketName, key);
+    request.setRange(pos, pos + partLen - 1);
+    wrappedStream = ossClient.getObject(request).getObjectContent();
+    if (wrappedStream == null) {
+      throw new IOException("Null IO stream");
+    }
+    position = pos;
+    partRemaining = partLen;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    checkNotClosed();
+
+    if (partRemaining <= 0 && position < dataLen) {
+      reopen(position);
+    }
+
+    int tries = MAX_RETRIES;
+    boolean retry;
+    int byteRead = -1;
+    do {
+      retry = false;
+      try {
+        byteRead = wrappedStream.read();
+      } catch (Exception e) {
+        handleReadException(e, --tries);
+        retry = true;
+      }
+    } while (retry);
+    if (byteRead >= 0) {
+      position++;
+      partRemaining--;
+    }
+
+    if (statistics != null && byteRead >= 0) {
+      statistics.incrementBytesRead(1);
+    }
+    return byteRead;
+  }
+
+
+  /**
+   * Check whether the input stream is closed.
+   *
+   * @throws IOException if stream is closed
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException("Stream is closed!");
+    }
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len)
+      throws IOException {
+    checkNotClosed();
+
+    if (buf == null) {
+      throw new NullPointerException();
+    } else if (off < 0 || len < 0 || len > buf.length - off) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return 0;
+    }
+
+    int bytesRead = 0;
+    // Not EOF, and read not done
+    while (position < dataLen && bytesRead < len) {
+      if (partRemaining == 0) {
+        reopen(position);
+      }
+
+      int tries = MAX_RETRIES;
+      boolean retry;
+      int bytes = -1;
+      do {
+        retry = false;
+        try {
+          bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
+        } catch (Exception e) {
+          handleReadException(e, --tries);
+          retry = true;
+        }
+      } while (retry);
+
+      if (bytes > 0) {
+        bytesRead += bytes;
+        position += bytes;
+        partRemaining -= bytes;
+      } else if (partRemaining != 0) {
+        throw new IOException("Failed to read from stream. Remaining:" +
+            partRemaining);
+      }
+    }
+
+    if (statistics != null && bytesRead > 0) {
+      statistics.incrementBytesRead(bytesRead);
+    }
+
+    // Read nothing, but attempt to read something
+    if (bytesRead == 0 && len > 0) {
+      return -1;
+    } else {
+      return bytesRead;
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    if (wrappedStream != null) {
+      wrappedStream.close();
+    }
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    checkNotClosed();
+
+    long remaining = dataLen - position;
+    if (remaining > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    }
+    return (int)remaining;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    checkNotClosed();
+    if (position == pos) {
+      return;
+    } else if (pos > position && pos < position + partRemaining) {
+      wrappedStream.skip(pos - position);
+      position = pos;
+    } else {
+      reopen(pos);
+    }
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    checkNotClosed();
+    return position;
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    checkNotClosed();
+    return false;
+  }
+
+  private void handleReadException(Exception e, int tries) throws IOException{
+    if (tries == 0) {
+      throw new IOException(e);
+    }
+
+    LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
+        " connection at position '" + position + "', " + e.getMessage());
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e2) {
+      LOG.warn(e2.getMessage());
+    }
+    reopen(position);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
new file mode 100644
index 0000000..589e014
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.util.Progressable;
+
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.model.AbortMultipartUploadRequest;
+import com.aliyun.oss.model.CompleteMultipartUploadRequest;
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.InitiateMultipartUploadRequest;
+import com.aliyun.oss.model.InitiateMultipartUploadResult;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PartETag;
+import com.aliyun.oss.model.PutObjectResult;
+import com.aliyun.oss.model.UploadPartRequest;
+import com.aliyun.oss.model.UploadPartResult;
+
+/**
+ * The output stream for OSS blob system.
+ * Data will be buffered on local disk, then uploaded to OSS in
+ * {@link #close()} method.
+ */
+public class AliyunOSSOutputStream extends OutputStream {
+  public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
+  private String bucketName;
+  private String key;
+  private Statistics statistics;
+  private Progressable progress;
+  private String serverSideEncryptionAlgorithm;
+  private long partSize;
+  private long partSizeThreshold;
+  private LocalDirAllocator dirAlloc;
+  private boolean closed;
+  private File tmpFile;
+  private BufferedOutputStream backupStream;
+  private OSSClient ossClient;
+
+  public AliyunOSSOutputStream(Configuration conf, OSSClient client,
+      String bucketName, String key, Progressable progress,
+      Statistics statistics, String serverSideEncryptionAlgorithm)
+      throws IOException {
+    this.bucketName = bucketName;
+    this.key = key;
+    // The caller cann't get any progress information
+    this.progress = progress;
+    ossClient = client;
+    this.statistics = statistics;
+    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+
+    partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
+        MULTIPART_UPLOAD_SIZE_DEFAULT);
+    partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
+        MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
+
+    if (conf.get(BUFFER_DIR_KEY) == null) {
+      conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
+    }
+    dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
+
+    tmpFile = dirAlloc.createTmpFileForWrite("output-",
+        LocalDirAllocator.SIZE_UNKNOWN, conf);
+    backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
+    closed = false;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    if (backupStream != null) {
+      backupStream.close();
+    }
+    long dataLen = tmpFile.length();
+    try {
+      if (dataLen <= partSizeThreshold) {
+        uploadObject();
+      } else {
+        multipartUploadObject();
+      }
+    } finally {
+      tmpFile.delete();
+    }
+  }
+
+  /**
+   * Upload temporary file as an OSS object, using single upload.
+   *
+   * @throws IOException
+   */
+  private void uploadObject() throws IOException {
+    File object = tmpFile.getAbsoluteFile();
+    FileInputStream fis = new FileInputStream(object);
+    ObjectMetadata meta = new ObjectMetadata();
+    meta.setContentLength(object.length());
+    if (!serverSideEncryptionAlgorithm.isEmpty()) {
+      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    try {
+      PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
+      LOG.debug(result.getETag());
+      statistics.incrementWriteOps(1);
+    } finally {
+      fis.close();
+    }
+  }
+
+  /**
+   * Upload temporary file as an OSS object, using multipart upload.
+   *
+   * @throws IOException
+   */
+  private void multipartUploadObject() throws IOException {
+    File object = tmpFile.getAbsoluteFile();
+    long dataLen = object.length();
+    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
+        new InitiateMultipartUploadRequest(bucketName, key);
+    ObjectMetadata meta = new ObjectMetadata();
+    //    meta.setContentLength(dataLen);
+    if (!serverSideEncryptionAlgorithm.isEmpty()) {
+      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    initiateMultipartUploadRequest.setObjectMetadata(meta);
+    InitiateMultipartUploadResult initiateMultipartUploadResult =
+        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
+    int partNum = (int)(dataLen / partSize);
+    if (dataLen % partSize != 0) {
+      partNum += 1;
+    }
+    if (partNum > MULTIPART_UPLOAD_PART_NUM_LIMIT) {
+      throw new IOException("Number of parts " + partNum + " should not be " +
+          "bigger than limit " + MULTIPART_UPLOAD_PART_NUM_LIMIT);
+    }
+    List<PartETag> partETags = new ArrayList<PartETag>();
+    String uploadId = initiateMultipartUploadResult.getUploadId();
+
+    try {
+      for (int i = 0; i < partNum; i++) {
+        // TODO: Optimize this, avoid opening the object multiple times
+        FileInputStream fis = new FileInputStream(object);
+        try {
+          long skipBytes = partSize * i;
+          fis.skip(skipBytes);
+          long size = (partSize < dataLen - skipBytes) ?
+              partSize : dataLen - skipBytes;
+          UploadPartRequest uploadPartRequest = new UploadPartRequest();
+          uploadPartRequest.setBucketName(bucketName);
+          uploadPartRequest.setKey(key);
+          uploadPartRequest.setUploadId(uploadId);
+          uploadPartRequest.setInputStream(fis);
+          uploadPartRequest.setPartSize(size);
+          uploadPartRequest.setPartNumber(i + 1);
+          UploadPartResult uploadPartResult =
+              ossClient.uploadPart(uploadPartRequest);
+          statistics.incrementWriteOps(1);
+          partETags.add(uploadPartResult.getPartETag());
+        } finally {
+          fis.close();
+        }
+      }
+      CompleteMultipartUploadRequest completeMultipartUploadRequest =
+          new CompleteMultipartUploadRequest(bucketName, key,
+          uploadId, partETags);
+      CompleteMultipartUploadResult completeMultipartUploadResult =
+          ossClient.completeMultipartUpload(completeMultipartUploadRequest);
+      LOG.debug(completeMultipartUploadResult.getETag());
+    } catch (Exception e) {
+      AbortMultipartUploadRequest abortMultipartUploadRequest =
+          new AbortMultipartUploadRequest(bucketName, key, uploadId);
+      ossClient.abortMultipartUpload(abortMultipartUploadRequest);
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    backupStream.flush();
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    backupStream.write(b);
+    statistics.incrementBytesWritten(1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
new file mode 100644
index 0000000..3f66a4f
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.Objects;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Utility methods for Aliyun OSS code.
+ */
+final public class AliyunOSSUtils {
+  private AliyunOSSUtils() {
+  }
+
+  /**
+   * User information includes user name and password.
+   */
+  static public class UserInfo {
+    private final String user;
+    private final String password;
+
+    public static final UserInfo EMPTY = new UserInfo("", "");
+
+    public UserInfo(String user, String password) {
+      this.user = user;
+      this.password = password;
+    }
+
+    /**
+     * Predicate to verify user information is set.
+     * @return true if the username is defined (not null, not empty).
+     */
+    public boolean hasLogin() {
+      return StringUtils.isNotEmpty(user);
+    }
+
+    /**
+     * Equality test matches user and password.
+     * @param o other object
+     * @return true if the objects are considered equivalent.
+     */
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      UserInfo that = (UserInfo) o;
+      return Objects.equals(user, that.user) &&
+          Objects.equals(password, that.password);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(user, password);
+    }
+
+    public String getUser() {
+      return user;
+    }
+
+    public String getPassword() {
+      return password;
+    }
+  }
+
+  /**
+   * Used to get password from configuration, if default value is not available.
+   * @param conf configuration that contains password information
+   * @param key the key of the password
+   * @param val the default value of the key
+   * @return the value for the key
+   * @throws IOException if failed to get password from configuration
+   */
+  static public String getPassword(Configuration conf, String key, String val)
+      throws IOException {
+    if (StringUtils.isEmpty(val)) {
+      try {
+        final char[] pass = conf.getPassword(key);
+        if (pass != null) {
+          return (new String(pass)).trim();
+        } else {
+          return "";
+        }
+      } catch (IOException ioe) {
+        throw new IOException("Cannot find password option " + key, ioe);
+      }
+    } else {
+      return val;
+    }
+  }
+
+  /**
+   * Extract the user information details from a URI.
+   * @param name URI of the filesystem
+   * @return a login tuple, possibly empty.
+   */
+  public static UserInfo extractLoginDetails(URI name) {
+    try {
+      String authority = name.getAuthority();
+      if (authority == null) {
+        return UserInfo.EMPTY;
+      }
+      int loginIndex = authority.indexOf('@');
+      if (loginIndex < 0) {
+        // No user information
+        return UserInfo.EMPTY;
+      }
+      String login = authority.substring(0, loginIndex);
+      int loginSplit = login.indexOf(':');
+      if (loginSplit > 0) {
+        String user = login.substring(0, loginSplit);
+        String password = URLDecoder.decode(login.substring(loginSplit + 1),
+            "UTF-8");
+        return new UserInfo(user, password);
+      } else if (loginSplit == 0) {
+        // There is no user, just a password.
+        return UserInfo.EMPTY;
+      } else {
+        return new UserInfo(login, "");
+      }
+    } catch (UnsupportedEncodingException e) {
+      // This should never happen; translate it if it does.
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
new file mode 100644
index 0000000..4ee4cd4
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+/**
+ * ALL configuration constants for OSS filesystem.
+ */
+public final class Constants {
+
+  private Constants() {
+  }
+
+  // Class of credential provider
+  public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY =
+      "fs.oss.credentials.provider";
+
+  // OSS access verification
+  public static final String ACCESS_KEY = "fs.oss.access.key";
+  public static final String SECRET_KEY = "fs.oss.secret.key";
+
+  // Number of simultaneous connections to oss
+  public static final String MAXIMUM_CONNECTIONS_KEY =
+      "fs.oss.connection.maximum";
+  public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32;
+
+  // Connect to oss over ssl
+  public static final String SECURE_CONNECTIONS_KEY =
+      "fs.oss.connection.secure.enabled";
+  public static final boolean SECURE_CONNECTIONS_DEFAULT = true;
+
+  // Use a custom endpoint
+  public static final String ENDPOINT_KEY = "fs.oss.endpoint";
+
+  // Connect to oss through a proxy server
+  public static final String PROXY_HOST_KEY = "fs.oss.proxy.host";
+  public static final String PROXY_PORT_KEY = "fs.oss.proxy.port";
+  public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username";
+  public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password";
+  public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain";
+  public static final String PROXY_WORKSTATION_KEY =
+      "fs.oss.proxy.workstation";
+
+  // Number of times we should retry errors
+  public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum";
+  public static final int MAX_ERROR_RETRIES_DEFAULT = 20;
+
+  // Time until we give up trying to establish a connection to oss
+  public static final String ESTABLISH_TIMEOUT_KEY =
+      "fs.oss.connection.establish.timeout";
+  public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000;
+
+  // Time until we give up on a connection to oss
+  public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout";
+  public static final int SOCKET_TIMEOUT_DEFAULT = 200000;
+
+  // Number of records to get while paging through a directory listing
+  public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum";
+  public static final int MAX_PAGING_KEYS_DEFAULT = 500;
+
+  // Size of each of or multipart pieces in bytes
+  public static final String MULTIPART_UPLOAD_SIZE_KEY =
+      "fs.oss.multipart.upload.size";
+
+  public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
+  public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 1000;
+
+  // Minimum size in bytes before we start a multipart uploads or copy
+  public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY =
+      "fs.oss.multipart.upload.threshold";
+  public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT =
+      20 * 1024 * 1024;
+
+  public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
+      "fs.oss.multipart.download.size";
+
+  public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
+
+  // Comma separated list of directories
+  public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
+
+  // private | public-read | public-read-write | authenticated-read |
+  // log-delivery-write | bucket-owner-read | bucket-owner-full-control
+  public static final String CANNED_ACL_KEY = "fs.oss.acl.default";
+  public static final String CANNED_ACL_DEFAULT = "";
+
+  // OSS server-side encryption
+  public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY =
+      "fs.oss.server-side-encryption-algorithm";
+
+  public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size";
+  public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
+  public static final String FS_OSS = "oss";
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java
new file mode 100644
index 0000000..234567b
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Aliyun OSS Filesystem.
+ */
+package org.apache.hadoop.fs.aliyun.oss;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/OSSTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/OSSTestUtils.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/OSSTestUtils.java
new file mode 100644
index 0000000..37ed831
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/OSSTestUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.internal.AssumptionViolatedException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * Utility class for OSS Tests.
+ */
+public final class OSSTestUtils {
+
+  private OSSTestUtils() {
+  }
+
+  /**
+   * Create the test filesystem.
+   *
+   * If the test.fs.oss.name property is not set,
+   * tests will fail.
+   *
+   * @param conf configuration
+   * @return the FS
+   * @throws IOException
+   */
+  public static AliyunOSSFileSystem createTestFileSystem(Configuration conf)
+      throws IOException {
+    String fsname = conf.getTrimmed(
+        TestOSSFileSystemContract.TEST_FS_OSS_NAME, "");
+
+    boolean liveTest = !StringUtils.isEmpty(fsname);
+    URI testURI = null;
+    if (liveTest) {
+      testURI = URI.create(fsname);
+      liveTest = testURI.getScheme().equals(Constants.FS_OSS);
+    }
+
+    if (!liveTest) {
+      throw new AssumptionViolatedException("No test filesystem in "
+          + TestOSSFileSystemContract.TEST_FS_OSS_NAME);
+    }
+    AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem();
+    ossfs.initialize(testURI, conf);
+    return ossfs;
+  }
+
+  /**
+   * Generate unique test path for multiple user tests.
+   *
+   * @return root test path
+   */
+  public static String generateUniqueTestPath() {
+    Long time = new Date().getTime();
+    Random rand = new Random();
+    return "/test_" + Long.toString(time) + "_"
+        + Long.toString(Math.abs(rand.nextLong()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java
new file mode 100644
index 0000000..de4e5a9
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**
+ * Tests a live OSS system.
+ *
+ * This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
+ * TestCase which uses the old Junit3 runner that doesn't ignore assumptions
+ * properly making it impossible to skip the tests if we don't have a valid
+ * bucket.
+ */
+public class TestOSSFileSystemContract extends FileSystemContractBaseTest {
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(TestOSSFileSystemContract.class);
+
+  public static final String TEST_FS_OSS_NAME = "test.fs.oss.name";
+  private static String testRootPath = OSSTestUtils.generateUniqueTestPath();
+
+  @Override
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    fs = OSSTestUtils.createTestFileSystem(conf);
+    super.setUp();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(super.path(testRootPath), true);
+    }
+    super.tearDown();
+  }
+
+  @Override
+  protected Path path(String path) {
+    if (path.startsWith("/")) {
+      return super.path(testRootPath + path);
+    } else {
+      return super.path(testRootPath + "/" + path);
+    }
+  }
+
+  @Override
+  public void testMkdirsWithUmask() throws Exception {
+    // not supported
+  }
+
+  /**
+   * Assert that root directory renames are not allowed.
+   *
+   * @throws Exception on failures
+   */
+  @Override
+  public void testRootDirAlwaysExists() throws Exception {
+    //this will throw an exception if the path is not found
+    fs.getFileStatus(super.path("/"));
+    //this catches overrides of the base exists() method that don't
+    //use getFileStatus() as an existence probe
+    assertTrue("FileSystem.exists() fails for root",
+        fs.exists(super.path("/")));
+  }
+
+  /**
+   * Assert that root directory renames are not allowed.
+   *
+   * @throws Exception on failures
+   */
+  @Override
+  public void testRenameRootDirForbidden() throws Exception {
+    if (!renameSupported()) {
+      return;
+    }
+    rename(super.path("/"),
+           super.path("/test/newRootDir"),
+           false, true, false);
+  }
+
+  public void testDeleteSubdir() throws IOException {
+    Path parentDir = this.path("/test/hadoop");
+    Path file = this.path("/test/hadoop/file");
+    Path subdir = this.path("/test/hadoop/subdir");
+    this.createFile(file);
+
+    assertTrue("Created subdir", this.fs.mkdirs(subdir));
+    assertTrue("File exists", this.fs.exists(file));
+    assertTrue("Parent dir exists", this.fs.exists(parentDir));
+    assertTrue("Subdir exists", this.fs.exists(subdir));
+
+    assertTrue("Deleted subdir", this.fs.delete(subdir, true));
+    assertTrue("Parent should exist", this.fs.exists(parentDir));
+
+    assertTrue("Deleted file", this.fs.delete(file, false));
+    assertTrue("Parent should exist", this.fs.exists(parentDir));
+  }
+
+
+  @Override
+  protected boolean renameSupported() {
+    return true;
+  }
+
+  @Override
+  public void testRenameNonExistentPath() throws Exception {
+    if (this.renameSupported()) {
+      Path src = this.path("/test/hadoop/path");
+      Path dst = this.path("/test/new/newpath");
+      try {
+        super.rename(src, dst, false, false, false);
+        fail("Should throw FileNotFoundException!");
+      } catch (FileNotFoundException e) {
+        // expected
+      }
+    }
+  }
+
+  @Override
+  public void testRenameFileMoveToNonExistentDirectory() throws Exception {
+    if (this.renameSupported()) {
+      Path src = this.path("/test/hadoop/file");
+      this.createFile(src);
+      Path dst = this.path("/test/new/newfile");
+      try {
+        super.rename(src, dst, false, true, false);
+        fail("Should throw FileNotFoundException!");
+      } catch (FileNotFoundException e) {
+        // expected
+      }
+    }
+  }
+
+  @Override
+  public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception {
+    if (this.renameSupported()) {
+      Path src = this.path("/test/hadoop/dir");
+      this.fs.mkdirs(src);
+      Path dst = this.path("/test/new/newdir");
+      try {
+        super.rename(src, dst, false, true, false);
+        fail("Should throw FileNotFoundException!");
+      } catch (FileNotFoundException e) {
+        // expected
+      }
+    }
+  }
+
+  @Override
+  public void testRenameFileMoveToExistingDirectory() throws Exception {
+    super.testRenameFileMoveToExistingDirectory();
+  }
+
+  @Override
+  public void testRenameFileAsExistingFile() throws Exception {
+    if (this.renameSupported()) {
+      Path src = this.path("/test/hadoop/file");
+      this.createFile(src);
+      Path dst = this.path("/test/new/newfile");
+      this.createFile(dst);
+      try {
+        super.rename(src, dst, false, true, true);
+        fail("Should throw FileAlreadyExistsException");
+      } catch (FileAlreadyExistsException e) {
+        // expected
+      }
+    }
+  }
+
+  @Override
+  public void testRenameDirectoryAsExistingFile() throws Exception {
+    if (this.renameSupported()) {
+      Path src = this.path("/test/hadoop/dir");
+      this.fs.mkdirs(src);
+      Path dst = this.path("/test/new/newfile");
+      this.createFile(dst);
+      try {
+        super.rename(src, dst, false, true, true);
+        fail("Should throw FileAlreadyExistsException");
+      } catch (FileAlreadyExistsException e) {
+        // expected
+      }
+    }
+  }
+
+  public void testGetFileStatusFileAndDirectory() throws Exception {
+    Path filePath = this.path("/test/oss/file1");
+    this.createFile(filePath);
+    assertTrue("Should be file", this.fs.getFileStatus(filePath).isFile());
+    assertFalse("Should not be directory",
+        this.fs.getFileStatus(filePath).isDirectory());
+
+    Path dirPath = this.path("/test/oss/dir");
+    this.fs.mkdirs(dirPath);
+    assertTrue("Should be directory",
+        this.fs.getFileStatus(dirPath).isDirectory());
+    assertFalse("Should not be file", this.fs.getFileStatus(dirPath).isFile());
+  }
+
+  public void testMkdirsForExistingFile() throws Exception {
+    Path testFile = this.path("/test/hadoop/file");
+    assertFalse(this.fs.exists(testFile));
+    this.createFile(testFile);
+    assertTrue(this.fs.exists(testFile));
+    try {
+      this.fs.mkdirs(testFile);
+      fail("Should throw FileAlreadyExistsException!");
+    } catch (FileAlreadyExistsException e) {
+      // expected
+    }
+  }
+
+  public void testWorkingDirectory() throws Exception {
+    Path workDir = super.path(this.getDefaultWorkingDirectory());
+    assertEquals(workDir, this.fs.getWorkingDirectory());
+    this.fs.setWorkingDirectory(super.path("."));
+    assertEquals(workDir, this.fs.getWorkingDirectory());
+    this.fs.setWorkingDirectory(super.path(".."));
+    assertEquals(workDir.getParent(), this.fs.getWorkingDirectory());
+    Path relativeDir = super.path("hadoop");
+    this.fs.setWorkingDirectory(relativeDir);
+    assertEquals(relativeDir, this.fs.getWorkingDirectory());
+    Path absoluteDir = super.path("/test/hadoop");
+    this.fs.setWorkingDirectory(absoluteDir);
+    assertEquals(absoluteDir, this.fs.getWorkingDirectory());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSInputStream.java
new file mode 100644
index 0000000..411cd57
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSInputStream.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.*;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests basic functionality for AliyunOSSInputStream, including seeking and
+ * reading files.
+ */
+public class TestOSSInputStream {
+
+  private FileSystem fs;
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(TestOSSInputStream.class);
+
+  private static String testRootPath = OSSTestUtils.generateUniqueTestPath();
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    fs = OSSTestUtils.createTestFileSystem(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(new Path(testRootPath), true);
+    }
+  }
+
+  private Path setPath(String path) {
+    if (path.startsWith("/")) {
+      return new Path(testRootPath + path);
+    } else {
+      return new Path(testRootPath + "/" + path);
+    }
+  }
+
+  @Test
+  public void testSeekFile() throws Exception {
+    Path smallSeekFile = setPath("/test/smallSeekFile.txt");
+    long size = 5 * 1024 * 1024;
+
+    ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
+    LOG.info("5MB file created: smallSeekFile.txt");
+
+    FSDataInputStream instream = this.fs.open(smallSeekFile);
+    int seekTimes = 5;
+    LOG.info("multiple fold position seeking test...:");
+    for (int i = 0; i < seekTimes; i++) {
+      long pos = size / (seekTimes - i) - 1;
+      LOG.info("begin seeking for pos: " + pos);
+      instream.seek(pos);
+      assertTrue("expected position at:" + pos + ", but got:"
+          + instream.getPos(), instream.getPos() == pos);
+      LOG.info("completed seeking at pos: " + instream.getPos());
+    }
+    LOG.info("random position seeking test...:");
+    Random rand = new Random();
+    for (int i = 0; i < seekTimes; i++) {
+      long pos = Math.abs(rand.nextLong()) % size;
+      LOG.info("begin seeking for pos: " + pos);
+      instream.seek(pos);
+      assertTrue("expected position at:" + pos + ", but got:"
+          + instream.getPos(), instream.getPos() == pos);
+      LOG.info("completed seeking at pos: " + instream.getPos());
+    }
+    IOUtils.closeStream(instream);
+  }
+
+  @Test
+  public void testReadFile() throws Exception {
+    final int bufLen = 256;
+    final int sizeFlag = 5;
+    String filename = "readTestFile_" + sizeFlag + ".txt";
+    Path readTestFile = setPath("/test/" + filename);
+    long size = sizeFlag * 1024 * 1024;
+
+    ContractTestUtils.generateTestFile(this.fs, readTestFile, size, 256, 255);
+    LOG.info(sizeFlag + "MB file created: /test/" + filename);
+
+    FSDataInputStream instream = this.fs.open(readTestFile);
+    byte[] buf = new byte[bufLen];
+    long bytesRead = 0;
+    while (bytesRead < size) {
+      int bytes;
+      if (size - bytesRead < bufLen) {
+        int remaining = (int)(size - bytesRead);
+        bytes = instream.read(buf, 0, remaining);
+      } else {
+        bytes = instream.read(buf, 0, bufLen);
+      }
+      bytesRead += bytes;
+
+      if (bytesRead % (1024 * 1024) == 0) {
+        int available = instream.available();
+        int remaining = (int)(size - bytesRead);
+        assertTrue("expected remaining:" + remaining + ", but got:" + available,
+            remaining == available);
+        LOG.info("Bytes read: " + Math.round((double)bytesRead / (1024 * 1024))
+            + " MB");
+      }
+    }
+    assertTrue(instream.available() == 0);
+    IOUtils.closeStream(instream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8346f922/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java
new file mode 100644
index 0000000..3951529
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+
+/**
+ * Tests regular and multi-part upload functionality for AliyunOSSOutputStream.
+ */
+public class TestOSSOutputStream {
+  private FileSystem fs;
+  private static String testRootPath = OSSTestUtils.generateUniqueTestPath();
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
+    conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024);
+    fs = OSSTestUtils.createTestFileSystem(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(new Path(testRootPath), true);
+    }
+  }
+
+  protected Path getTestPath() {
+    return new Path(testRootPath + "/testoss");
+  }
+
+  @Test
+  public void testRegularUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
+  }
+
+  @Test
+  public void testMultiPartUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message