From common-commits-return-88495-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Mon Sep 24 20:45:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3625F180675 for ; Mon, 24 Sep 2018 20:45:31 +0200 (CEST) Received: (qmail 48435 invoked by uid 500); 24 Sep 2018 18:45:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 47828 invoked by uid 99); 24 Sep 2018 18:45:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Sep 2018 18:45:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 26C64E11E5; Mon, 24 Sep 2018 18:45:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: xkrogen@apache.org To: common-commits@hadoop.apache.org Date: Mon, 24 Sep 2018 18:45:43 -0000 Message-Id: <78b302f53cdb47169393e0923a897610@git.apache.org> In-Reply-To: <5a85de8c37134d7d9b8cee51e3172d0a@git.apache.org> References: <5a85de8c37134d7d9b8cee51e3172d0a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/50] [abbrv] hadoop git commit: HADOOP-15560. ABFS: removed dependency injection and unnecessary dependencies. Contributed by Da Zhou. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java deleted file mode 100644 index 9e4c27b..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URL; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; -import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory; -import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; -import org.apache.http.client.utils.URIBuilder; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; - -@Singleton -@InterfaceAudience.Private -@InterfaceStability.Evolving -class AbfsHttpClientFactoryImpl implements AbfsHttpClientFactory { - private final ConfigurationService configurationService; - - @Inject - AbfsHttpClientFactoryImpl( - final ConfigurationService configurationService) { - - Preconditions.checkNotNull(configurationService, "configurationService"); - - this.configurationService = configurationService; - } - - @VisibleForTesting - URIBuilder getURIBuilder(final String hostName, final FileSystem fs) { - final AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs; - - String scheme = FileSystemUriSchemes.HTTP_SCHEME; - - if (abfs.isSecure()) { - scheme = FileSystemUriSchemes.HTTPS_SCHEME; - } - - final URIBuilder uriBuilder = new URIBuilder(); - uriBuilder.setScheme(scheme); - uriBuilder.setHost(hostName); - - return uriBuilder; - } - - public AbfsClient create(final AzureBlobFileSystem fs) throws AzureBlobFileSystemException { - final URI uri = fs.getUri(); - final String authority = uri.getRawAuthority(); - if (null == authority) { - throw new InvalidUriAuthorityException(uri.toString()); - } - - if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) { - throw new InvalidUriAuthorityException(uri.toString()); - } - - final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2); - - if (authorityParts.length < 2 || "".equals(authorityParts[0])) { - final String errMsg = String - .format("URI '%s' has a malformed authority, expected container name. " - + "Authority takes the form "+ FileSystemUriSchemes.ABFS_SCHEME + "://[@]", - uri.toString()); - throw new InvalidUriException(errMsg); - } - - final String fileSystemName = authorityParts[0]; - final String accountName = authorityParts[1]; - - final URIBuilder uriBuilder = getURIBuilder(accountName, fs); - - final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName; - - URL baseUrl; - try { - baseUrl = new URL(url); - } catch (MalformedURLException e) { - throw new InvalidUriException(String.format("URI '%s' is malformed", uri.toString())); - } - - SharedKeyCredentials creds = - new SharedKeyCredentials(accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)), - this.configurationService.getStorageAccountKey(accountName)); - - return new AbfsClient(baseUrl, creds, configurationService, new ExponentialRetryPolicy()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java deleted file mode 100644 index 06e1a8a..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java +++ /dev/null @@ -1,693 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import javax.xml.bind.DatatypeConverter; -import java.io.File; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Set; -import java.util.HashSet; -import java.util.Hashtable; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; -import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; - -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory; -import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; -import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; -import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; -import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.util.Time.now; - -@Singleton -@InterfaceAudience.Private -@InterfaceStability.Evolving -final class AbfsHttpServiceImpl implements AbfsHttpService { - public static final Logger LOG = LoggerFactory.getLogger(AbfsHttpService.class); - private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'"; - private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1"; - private static final int LIST_MAX_RESULTS = 5000; - private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000; - private static final int RENAME_TIMEOUT_MILISECONDS = 180000; - - private final AbfsHttpClientFactory abfsHttpClientFactory; - private final ConcurrentHashMap clientCache; - private final ConfigurationService configurationService; - private final Set azureAtomicRenameDirSet; - - @Inject - AbfsHttpServiceImpl( - final ConfigurationService configurationService, - final AbfsHttpClientFactory abfsHttpClientFactory, - final TracingService tracingService) { - Preconditions.checkNotNull(abfsHttpClientFactory, "abfsHttpClientFactory"); - Preconditions.checkNotNull(configurationService, "configurationService"); - Preconditions.checkNotNull(tracingService, "tracingService"); - - this.configurationService = configurationService; - this.clientCache = new ConcurrentHashMap<>(); - this.abfsHttpClientFactory = abfsHttpClientFactory; - this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(configurationService.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA))); - } - - @Override - public Hashtable getFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem) - throws AzureBlobFileSystemException{ - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "getFilesystemProperties for filesystem: {}", - client.getFileSystem()); - - final Hashtable parsedXmsProperties; - - final AbfsRestOperation op = client.getFilesystemProperties(); - final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); - - parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); - - return parsedXmsProperties; - } - - @Override - public void setFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem, final Hashtable properties) throws - AzureBlobFileSystemException { - if (properties == null || properties.size() == 0) { - return; - } - - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "setFilesystemProperties for filesystem: {} with properties: {}", - client.getFileSystem(), - properties); - - final String commaSeparatedProperties; - try { - commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); - } catch (CharacterCodingException ex) { - throw new InvalidAbfsRestOperationException(ex); - } - client.setFilesystemProperties(commaSeparatedProperties); - } - - @Override - public Hashtable getPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws - AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "getPathProperties for filesystem: {} path: {}", - client.getFileSystem(), - path.toString()); - - final Hashtable parsedXmsProperties; - final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); - - final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); - - parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); - - return parsedXmsProperties; - } - - @Override - public void setPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final Hashtable properties) throws - AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "setFilesystemProperties for filesystem: {} path: {} with properties: {}", - client.getFileSystem(), - path.toString(), - properties); - - final String commaSeparatedProperties; - try { - commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); - } catch (CharacterCodingException ex) { - throw new InvalidAbfsRestOperationException(ex); - } - client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties); - } - - @Override - public void createFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "createFilesystem for filesystem: {}", - client.getFileSystem()); - - client.createFilesystem(); - } - - @Override - public void deleteFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "deleteFilesystem for filesystem: {}", - client.getFileSystem()); - - client.deleteFilesystem(); - } - - @Override - public OutputStream createFile(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws - AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "createFile filesystem: {} path: {} overwrite: {}", - client.getFileSystem(), - path.toString(), - overwrite); - - client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite); - - final OutputStream outputStream; - outputStream = new FSDataOutputStream( - new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - configurationService.getWriteBufferSize()), null); - return outputStream; - } - - @Override - public Void createDirectory(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "createDirectory filesystem: {} path: {} overwrite: {}", - client.getFileSystem(), - path.toString()); - - client.createPath("/" + getRelativePath(path), false, true); - - return null; - } - - @Override - public InputStream openFileForRead(final AzureBlobFileSystem azureBlobFileSystem, final Path path, - final FileSystem.Statistics statistics) throws AzureBlobFileSystemException { - final AbfsClient client = getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "openFileForRead filesystem: {} path: {}", - client.getFileSystem(), - path.toString()); - - final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); - - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - - if (parseIsDirectory(resourceType)) { - throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", - null); - } - - // Add statistics for InputStream - return new FSDataInputStream( - new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - configurationService.getReadBufferSize(), configurationService.getReadAheadQueueDepth(), eTag)); - } - - @Override - public OutputStream openFileForWrite(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws - AzureBlobFileSystemException { - final AbfsClient client = getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "openFileForWrite filesystem: {} path: {} overwrite: {}", - client.getFileSystem(), - path.toString(), - overwrite); - - final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); - - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - - if (parseIsDirectory(resourceType)) { - throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", - null); - } - - final long offset = overwrite ? 0 : contentLength; - - final OutputStream outputStream; - outputStream = new FSDataOutputStream( - new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - offset, configurationService.getWriteBufferSize()), null); - return outputStream; - } - - @Override - public void rename(final AzureBlobFileSystem azureBlobFileSystem, final Path source, final Path destination) throws - AzureBlobFileSystemException { - - if (isAtomicRenameKey(source.getName())) { - this.LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename," - +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account."); - } - - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "renameAsync filesystem: {} source: {} destination: {}", - client.getFileSystem(), - source.toString(), - destination.toString()); - - String continuation = null; - long deadline = now() + RENAME_TIMEOUT_MILISECONDS; - - do { - if (now() > deadline) { - LOG.debug( - "Rename {} to {} timed out.", - source, - destination); - - throw new TimeoutException("Rename timed out."); - } - - AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source), - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation); - continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); - - } while (continuation != null && !continuation.isEmpty()); - } - - @Override - public void delete(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean recursive) throws - AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "delete filesystem: {} path: {} recursive: {}", - client.getFileSystem(), - path.toString(), - String.valueOf(recursive)); - - String continuation = null; - long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS; - - do { - if (now() > deadline) { - this.LOG.debug( - "Delete directory {} timed out.", path); - - throw new TimeoutException("Delete directory timed out."); - } - - AbfsRestOperation op = client.deletePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation); - continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); - - } while (continuation != null && !continuation.isEmpty()); - } - - @Override - public FileStatus getFileStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "getFileStatus filesystem: {} path: {}", - client.getFileSystem(), - path.toString()); - - if (path.isRoot()) { - AbfsRestOperation op = client.getFilesystemProperties(); - final long blockSize = configurationService.getAzureBlockSize(); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); - return new VersionedFileStatus( - azureBlobFileSystem.getOwnerUser(), - azureBlobFileSystem.getOwnerUserPrimaryGroup(), - 0, - true, - 1, - blockSize, - parseLastModifiedTime(lastModified).getMillis(), - path, - eTag); - } else { - AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); - - final long blockSize = configurationService.getAzureBlockSize(); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); - final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH); - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - - return new VersionedFileStatus( - azureBlobFileSystem.getOwnerUser(), - azureBlobFileSystem.getOwnerUserPrimaryGroup(), - parseContentLength(contentLength), - parseIsDirectory(resourceType), - 1, - blockSize, - parseLastModifiedTime(lastModified).getMillis(), - path, - eTag); - } - } - - @Override - public FileStatus[] listStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException { - final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem); - - this.LOG.debug( - "listStatus filesystem: {} path: {}", - client.getFileSystem(), - path.toString()); - - String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path); - String continuation = null; - ArrayList fileStatuses = new ArrayList<>(); - - do { - AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation); - continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); - ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); - if (retrievedSchema == null) { - throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "listStatusAsync path not found", - null, op.getResult()); - } - - long blockSize = configurationService.getAzureBlockSize(); - - for (ListResultEntrySchema entry : retrievedSchema.paths()) { - long lastModifiedMillis = 0; - long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); - boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); - if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { - final DateTime dateTime = DateTime.parse( - entry.lastModified(), - DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); - lastModifiedMillis = dateTime.getMillis(); - } - - fileStatuses.add( - new VersionedFileStatus( - azureBlobFileSystem.getOwnerUser(), - azureBlobFileSystem.getOwnerUserPrimaryGroup(), - contentLength, - isDirectory, - 1, - blockSize, - lastModifiedMillis, - azureBlobFileSystem.makeQualified(new Path(File.separator + entry.name())), - entry.eTag())); - } - - } while (continuation != null && !continuation.isEmpty()); - - return fileStatuses.toArray(new FileStatus[0]); - } - - @Override - public synchronized void closeFileSystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException { - this.clientCache.remove(azureBlobFileSystem); - } - - @Override - public boolean isAtomicRenameKey(String key) { - return isKeyForDirectorySet(key, azureAtomicRenameDirSet); - } - - private String getRelativePath(final Path path) { - Preconditions.checkNotNull(path, "path"); - final String relativePath = path.toUri().getPath(); - - if (relativePath.length() == 0) { - return relativePath; - } - - if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) { - if (relativePath.length() == 1) { - return AbfsHttpConstants.EMPTY_STRING; - } - - return relativePath.substring(1); - } - - return relativePath; - } - - private synchronized AbfsClient getOrCreateClient(final AzureBlobFileSystem azureBlobFileSystem) throws - AzureBlobFileSystemException { - Preconditions.checkNotNull(azureBlobFileSystem, "azureBlobFileSystem"); - - AbfsClient client = this.clientCache.get(azureBlobFileSystem); - - if (client != null) { - return client; - } - - client = abfsHttpClientFactory.create(azureBlobFileSystem); - this.clientCache.put( - azureBlobFileSystem, - client); - return client; - } - - private long parseContentLength(final String contentLength) { - if (contentLength == null) { - return -1; - } - - return Long.parseLong(contentLength); - } - - private boolean parseIsDirectory(final String resourceType) { - return resourceType == null ? false : resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); - } - - private DateTime parseLastModifiedTime(final String lastModifiedTime) { - return DateTime.parse( - lastModifiedTime, - DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); - } - - private String convertXmsPropertiesToCommaSeparatedString(final Hashtable properties) throws - CharacterCodingException { - StringBuilder commaSeparatedProperties = new StringBuilder(); - - final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder(); - - for (Map.Entry propertyEntry : properties.entrySet()) { - String key = propertyEntry.getKey(); - String value = propertyEntry.getValue(); - - Boolean canEncodeValue = encoder.canEncode(value); - if (!canEncodeValue) { - throw new CharacterCodingException(); - } - - String encodedPropertyValue = DatatypeConverter.printBase64Binary(encoder.encode(CharBuffer.wrap(value)).array()); - commaSeparatedProperties.append(key) - .append(AbfsHttpConstants.EQUAL) - .append(encodedPropertyValue); - - commaSeparatedProperties.append(AbfsHttpConstants.COMMA); - } - - if (commaSeparatedProperties.length() != 0) { - commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1); - } - - return commaSeparatedProperties.toString(); - } - - private Hashtable parseCommaSeparatedXmsProperties(String xMsProperties) throws - InvalidFileSystemPropertyException, InvalidAbfsRestOperationException { - Hashtable properties = new Hashtable<>(); - - final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder(); - - if (xMsProperties != null && !xMsProperties.isEmpty()) { - String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA); - - if (userProperties.length == 0) { - return properties; - } - - for (String property : userProperties) { - if (property.isEmpty()) { - throw new InvalidFileSystemPropertyException(xMsProperties); - } - - String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2); - if (nameValue.length != 2) { - throw new InvalidFileSystemPropertyException(xMsProperties); - } - - byte[] decodedValue = DatatypeConverter.parseBase64Binary(nameValue[1]); - - final String value; - try { - value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString(); - } catch (CharacterCodingException ex) { - throw new InvalidAbfsRestOperationException(ex); - } - properties.put(nameValue[0], value); - } - } - - return properties; - } - - private boolean isKeyForDirectorySet(String key, Set dirSet) { - for (String dir : dirSet) { - if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) { - return true; - } - - try { - URI uri = new URI(dir); - if (null == uri.getAuthority()) { - if (key.startsWith(dir + "/")){ - return true; - } - } - } catch (URISyntaxException e) { - this.LOG.info("URI syntax error creating URI for {}", dir); - } - } - - return false; - } - - private static class VersionedFileStatus extends FileStatus { - private final String version; - - VersionedFileStatus( - final String owner, final String group, - final long length, final boolean isdir, final int blockReplication, - final long blocksize, final long modificationTime, final Path path, - String version) { - super(length, isdir, blockReplication, blocksize, modificationTime, 0, - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), - owner, - group, - path); - - this.version = version; - } - - /** Compare if this object is equal to another object. - * @param obj the object to be compared. - * @return true if two file status has the same path name; false if not. - */ - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (obj == null) { - return false; - } - - if (this.getClass() == obj.getClass()) { - VersionedFileStatus other = (VersionedFileStatus) obj; - return this.getPath().equals(other.getPath()) && this.version.equals(other.version); - } - - return false; - } - - /** - * Returns a hash code value for the object, which is defined as - * the hash code of the path name. - * - * @return a hash code value for the path name and version - */ - @Override - public int hashCode() { - int hash = getPath().hashCode(); - hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0); - return hash; - } - - /** - * Returns the version of this FileStatus - * - * @return a string value for the FileStatus version - */ - public String getVersion() { - return this.version; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java deleted file mode 100644 index 1cbf6b5..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import java.util.HashMap; -import java.util.Map; - -import com.google.inject.AbstractModule; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; -import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; -import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService; - -/** - * This class is responsible to configure all the services used by Azure Blob File System. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -class AbfsServiceInjectorImpl extends AbstractModule { - private final Configuration configuration; - private final Map providers; - private final Map instances; - - AbfsServiceInjectorImpl(Configuration configuration) { - this.providers = new HashMap<>(); - this.instances = new HashMap<>(); - this.configuration = configuration; - - this.instances.put(Configuration.class, this.configuration); - - this.providers.put(ConfigurationService.class, ConfigurationServiceImpl.class); - - this.providers.put(AbfsHttpService.class, AbfsHttpServiceImpl.class); - this.providers.put(AbfsHttpClientFactory.class, AbfsHttpClientFactoryImpl.class); - - this.providers.put(TracingService.class, TracingServiceImpl.class); - } - - @Override - protected void configure() { - for (Map.Entry entrySet : this.instances.entrySet()) { - bind(entrySet.getKey()).toInstance(entrySet.getValue()); - } - - for (Map.Entry entrySet : this.providers.entrySet()) { - bind(entrySet.getKey()).to(entrySet.getValue()); - } - } - - protected Configuration getConfiguration() { - return this.configuration; - } - - protected Map getProviders() { - return this.providers; - } - - protected Map getInstances() { - return this.instances; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java deleted file mode 100644 index 8560620..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.inject.Guice; -import com.google.inject.Injector; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ServiceResolutionException; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsServiceProvider; -import org.apache.hadoop.fs.azurebfs.contracts.services.InjectableService; - -/** - * Dependency injected Azure Storage services provider. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public final class AbfsServiceProviderImpl implements AbfsServiceProvider { - private static AbfsServiceProviderImpl abfsServiceProvider; - private final Injector abfsServiceInjector; - - private AbfsServiceProviderImpl(final Configuration configuration) { - this.abfsServiceInjector = Guice.createInjector(new AbfsServiceInjectorImpl(Preconditions.checkNotNull(configuration, "configuration"))); - } - - @VisibleForTesting - private AbfsServiceProviderImpl(final Injector abfsServiceInjector) { - Preconditions.checkNotNull(abfsServiceInjector, "abfsServiceInjector"); - this.abfsServiceInjector = abfsServiceInjector; - } - - /** - * Create an instance or returns existing instance of service provider. - * This method must be marked as synchronized to ensure thread-safety. - * @param configuration hadoop configuration. - * @return AbfsServiceProvider the service provider instance. - */ - public static synchronized AbfsServiceProvider create(final Configuration configuration) { - if (abfsServiceProvider == null) { - abfsServiceProvider = new AbfsServiceProviderImpl(configuration); - } - - return abfsServiceProvider; - } - - /** - * Returns current instance of service provider. - * @return AbfsServiceProvider the service provider instance. - */ - public static AbfsServiceProvider instance() { - return abfsServiceProvider; - } - - @VisibleForTesting - static synchronized AbfsServiceProvider create(Injector serviceInjector) { - abfsServiceProvider = new AbfsServiceProviderImpl(serviceInjector); - return abfsServiceProvider; - } - - /** - * Returns an instance of resolved injectable service by class name. - * The injectable service must be configured first to be resolvable. - * @param clazz the injectable service which is expected to be returned. - * @param The type of injectable service. - * @return T instance - * @throws ServiceResolutionException if the service is not resolvable. - */ - @Override - public T get(final Class clazz) throws ServiceResolutionException { - try { - return this.abfsServiceInjector.getInstance(clazz); - } catch (Exception ex) { - throw new ServiceResolutionException(clazz.getSimpleName(), ex); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java deleted file mode 100644 index 568ee5d..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import java.lang.reflect.Field; -import java.util.Map; - -import com.google.common.annotations.VisibleForTesting; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; -import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; -import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; -import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; -import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; -import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; -import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; -import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; - -@Singleton -@InterfaceAudience.Private -@InterfaceStability.Evolving -class ConfigurationServiceImpl implements ConfigurationService { - private final Configuration configuration; - private final boolean isSecure; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, - MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, - MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, - DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE) - private int writeBufferSize; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE, - MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, - MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, - DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE) - private int readBufferSize; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL, - DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL) - private int minBackoffInterval; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL, - DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL) - private int maxBackoffInterval; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL, - DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL) - private int backoffInterval; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES, - MinValue = 0, - DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS) - private int maxIoRetries; - - @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME, - MinValue = 0, - MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE, - DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE) - private long azureBlockSize; - - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, - DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT) - private String azureBlockLocationHost; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT, - MinValue = 1, - DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS) - private int maxConcurrentWriteThreads; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN, - MinValue = 1, - DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS) - private int maxConcurrentReadThreads; - - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND, - DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND) - private boolean tolerateOobAppends; - - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY, - DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) - private String azureAtomicDirs; - - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, - DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) - private boolean createRemoteFileSystemDuringInitialization; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, - DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) - private int readAheadQueueDepth; - - private Map storageAccountKeys; - - @Inject - ConfigurationServiceImpl(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { - this.configuration = configuration; - this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false); - - validateStorageAccountKeys(); - Field[] fields = this.getClass().getDeclaredFields(); - for (Field field : fields) { - field.setAccessible(true); - if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) { - field.set(this, validateInt(field)); - } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) { - field.set(this, validateLong(field)); - } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) { - field.set(this, validateString(field)); - } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) { - field.set(this, validateBase64String(field)); - } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) { - field.set(this, validateBoolean(field)); - } - } - } - - @Override - public boolean isEmulator() { - return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); - } - - @Override - public boolean isSecureMode() { - return this.isSecure; - } - - @Override - public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException { - String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName); - if (accountKey == null) { - throw new ConfigurationPropertyNotFoundException(accountName); - } - - return accountKey; - } - - @Override - public Configuration getConfiguration() { - return this.configuration; - } - - @Override - public int getWriteBufferSize() { - return this.writeBufferSize; - } - - @Override - public int getReadBufferSize() { - return this.readBufferSize; - } - - @Override - public int getMinBackoffIntervalMilliseconds() { - return this.minBackoffInterval; - } - - @Override - public int getMaxBackoffIntervalMilliseconds() { - return this.maxBackoffInterval; - } - - @Override - public int getBackoffIntervalMilliseconds() { - return this.backoffInterval; - } - - @Override - public int getMaxIoRetries() { - return this.maxIoRetries; - } - - @Override - public long getAzureBlockSize() { - return this.azureBlockSize; - } - - @Override - public String getAzureBlockLocationHost() { - return this.azureBlockLocationHost; - } - - @Override - public int getMaxConcurrentWriteThreads() { - return this.maxConcurrentWriteThreads; - } - - @Override - public int getMaxConcurrentReadThreads() { - return this.maxConcurrentReadThreads; - } - - @Override - public boolean getTolerateOobAppends() { - return this.tolerateOobAppends; - } - - @Override - public String getAzureAtomicRenameDirs() { - return this.azureAtomicDirs; - } - - @Override - public boolean getCreateRemoteFileSystemDuringInitialization() { - return this.createRemoteFileSystemDuringInitialization; - } - - @Override - public int getReadAheadQueueDepth() { - return this.readAheadQueueDepth; - } - - void validateStorageAccountKeys() throws InvalidConfigurationValueException { - Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( - ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); - this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); - - for (Map.Entry account : this.storageAccountKeys.entrySet()) { - validator.validate(account.getValue()); - } - } - - int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new IntegerConfigurationBasicValidator( - validator.MinValue(), - validator.MaxValue(), - validator.DefaultValue(), - validator.ConfigurationKey(), - validator.ThrowIfInvalid()).validate(value); - } - - long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new LongConfigurationBasicValidator( - validator.MinValue(), - validator.MaxValue(), - validator.DefaultValue(), - validator.ConfigurationKey(), - validator.ThrowIfInvalid()).validate(value); - } - - String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new StringConfigurationBasicValidator( - validator.ConfigurationKey(), - validator.DefaultValue(), - validator.ThrowIfInvalid()).validate(value); - } - - String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class)); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new Base64StringConfigurationBasicValidator( - validator.ConfigurationKey(), - validator.DefaultValue(), - validator.ThrowIfInvalid()).validate(value); - } - - boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new BooleanConfigurationBasicValidator( - validator.ConfigurationKey(), - validator.DefaultValue(), - validator.ThrowIfInvalid()).validate(value); - } - - @VisibleForTesting - void setReadBufferSize(int bufferSize) { - this.readBufferSize = bufferSize; - } - - @VisibleForTesting - void setWriteBufferSize(int bufferSize) { - this.writeBufferSize = bufferSize; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index 0c92612..54aa1ab 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -21,7 +21,10 @@ package org.apache.hadoop.fs.azurebfs.services; import java.util.Random; import java.net.HttpURLConnection; -class ExponentialRetryPolicy { +/** + * Retry policy used by AbfsClient. + * */ +public class ExponentialRetryPolicy { /** * Represents the default number of retry attempts. */ @@ -83,7 +86,7 @@ class ExponentialRetryPolicy { /** * Initializes a new instance of the {@link ExponentialRetryPolicy} class. */ - ExponentialRetryPolicy() { + public ExponentialRetryPolicy() { this(DEFAULT_CLIENT_RETRY_COUNT, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_CLIENT_BACKOFF); } @@ -96,7 +99,7 @@ class ExponentialRetryPolicy { * @param deltaBackoff The value that will be used to calculate a random delta in the exponential delay * between retries. */ - ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) { + public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) { this.retryCount = retryCount; this.minBackoff = minBackoff; this.maxBackoff = maxBackoff; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java deleted file mode 100644 index 99190e6..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import java.io.IOException; - -import com.google.common.base.Preconditions; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; -import org.apache.htrace.core.HTraceConfiguration; -import org.apache.htrace.core.Span; -import org.apache.htrace.core.SpanReceiver; -import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException; -import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; -import org.apache.htrace.fasterxml.jackson.databind.ObjectWriter; -import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * LoggerSpanReceiver is a layer between HTrace and log4j only used for {@link org.apache.hadoop.fs.azurebfs.contracts.services.TracingService} - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public class LoggerSpanReceiver extends SpanReceiver { - private static final ObjectWriter JSON_WRITER = - new ObjectMapper() - .configure(SerializationFeature.INDENT_OUTPUT, true) - .configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true) - .configure(SerializationFeature.WRITE_EMPTY_JSON_ARRAYS, false) - .configure(SerializationFeature.USE_EQUALITY_FOR_OBJECT_ID, false) - .writer(); - - public LoggerSpanReceiver(HTraceConfiguration hTraceConfiguration) { - Preconditions.checkNotNull(hTraceConfiguration, "hTraceConfiguration"); - } - - @Override - public void receiveSpan(final Span span) { - String jsonValue; - - Logger logger = LoggerFactory.getLogger(AzureBlobFileSystem.class); - - try { - jsonValue = JSON_WRITER.writeValueAsString(span); - logger.trace(jsonValue); - } catch (JsonProcessingException e) { - logger.error("Json processing error: " + e.getMessage()); - } - } - - @Override - public void close() throws IOException { - // No-Op - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java deleted file mode 100644 index 57b6463..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Objects; - -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService; -import org.apache.htrace.core.HTraceConfiguration; -import org.apache.htrace.core.Sampler; -import org.apache.htrace.core.SpanId; -import org.apache.htrace.core.TraceScope; -import org.apache.htrace.core.Tracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Singleton -@InterfaceAudience.Private -@InterfaceStability.Evolving -final class TracingServiceImpl implements TracingService { - private static final Logger LOG = LoggerFactory.getLogger(TracingService.class); - - private final Tracer tracer; - private final ThreadLocal currentScopeId; - - @Inject - TracingServiceImpl( - final Configuration configuration) { - Preconditions.checkNotNull(configuration, "configuration"); - - this.currentScopeId = new ThreadLocal<>(); - - this.tracer = new Tracer.Builder(TracingService.class.getSimpleName()). - conf(new HTraceConfiguration() { - @Override - public String get(String key) { - if (Objects.equals(key, Tracer.SPAN_RECEIVER_CLASSES_KEY)) { - return LoggerSpanReceiver.class.getName(); - } - return null; - } - - @Override - public String get(String key, String defaultValue) { - String value = get(key); - if (value != null) { - return value; - } - return defaultValue; - } - }). - build(); - - this.tracer.addSampler(Sampler.ALWAYS); - } - - @Override - public TraceScope traceBegin(String description) { - if (this.LOG.isTraceEnabled()) { - TraceScope traceScope = this.tracer.newScope(description); - this.currentScopeId.set(traceScope.getSpanId()); - return traceScope; - } - - return null; - } - - @Override - public TraceScope traceBegin(String description, SpanId parentSpanId) { - if (this.LOG.isTraceEnabled()) { - TraceScope traceScope = this.tracer.newScope(description, parentSpanId); - this.currentScopeId.set(traceScope.getSpanId()); - return traceScope; - } - - return null; - } - - @Override - public void traceException(TraceScope traceScope, AzureBlobFileSystemException azureBlobFileSystemException) { - if (this.LOG.isTraceEnabled()) { - Preconditions.checkNotNull(traceScope, "traceScope"); - Preconditions.checkNotNull(azureBlobFileSystemException, "azureBlobFileSystemException"); - - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - azureBlobFileSystemException.printStackTrace(printWriter); - printWriter.flush(); - - traceScope.addKVAnnotation("Exception", stringWriter.toString()); - } - } - - @Override - public SpanId getCurrentTraceScopeSpanId() { - return this.currentScopeId.get(); - } - - @Override - public void traceEnd(TraceScope traceScope) { - if (this.LOG.isTraceEnabled()) { - Preconditions.checkNotNull(traceScope, "traceScope"); - - SpanId[] parents = traceScope.getSpan().getParents(); - this.currentScopeId.set(parents != null && parents.length > 0 ? parents[parents.length - 1] : null); - traceScope.close(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java index 5ec1e2e..74a530c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java @@ -23,11 +23,9 @@ import java.util.UUID; import java.util.concurrent.Callable; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; -import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.mockito.internal.util.MockUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -38,12 +36,6 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; -import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; -import org.apache.hadoop.fs.azurebfs.services.MockAbfsHttpClientFactoryImpl; -import org.apache.hadoop.fs.azurebfs.services.MockAbfsServiceInjectorImpl; -import org.apache.hadoop.fs.azurebfs.services.MockServiceProviderImpl; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -54,7 +46,6 @@ import static org.junit.Assume.assumeNotNull; * Provide dependencies for AzureBlobFileSystem tests. */ public abstract class DependencyInjectedTest { - private final MockAbfsServiceInjectorImpl mockServiceInjector; private final boolean isEmulator; private NativeAzureFileSystem wasb; private String abfsScheme; @@ -64,21 +55,19 @@ public abstract class DependencyInjectedTest { private String accountName; private String testUrl; + public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-"; + public DependencyInjectedTest(final boolean secure) { this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME); } - public MockAbfsServiceInjectorImpl getMockServiceInjector() { - return this.mockServiceInjector; - } - protected DependencyInjectedTest() { this(FileSystemUriSchemes.ABFS_SCHEME); } private DependencyInjectedTest(final String scheme) { abfsScheme = scheme; - fileSystemName = UUID.randomUUID().toString(); + fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString(); configuration = new Configuration(); configuration.addResource("azure-bfs-test.xml"); @@ -98,18 +87,14 @@ public abstract class DependencyInjectedTest { this.testUrl = defaultUri.toString(); configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); configuration.setBoolean(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); - this.mockServiceInjector = new MockAbfsServiceInjectorImpl(configuration); this.isEmulator = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); this.accountName = this.configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME); } @Before public void initialize() throws Exception { - if (this.isEmulator) { - this.mockServiceInjector.replaceProvider(AbfsHttpClientFactory.class, MockAbfsHttpClientFactoryImpl.class); - } - - MockServiceProviderImpl.create(this.mockServiceInjector); + //Create filesystem first to make sure getWasbFileSystem() can return an existed filesystem. + this.getFileSystem(); if (!this.isEmulator) { final URI wasbUri = new URI(abfsUrlToWasbUrl(this.getTestUrl())); @@ -133,28 +118,24 @@ public abstract class DependencyInjectedTest { FileSystem.closeAll(); final AzureBlobFileSystem fs = this.getFileSystem(); - final AbfsHttpService abfsHttpService = AbfsServiceProviderImpl.instance().get(AbfsHttpService.class); - abfsHttpService.deleteFilesystem(fs); - - if (!(new MockUtil().isMock(abfsHttpService))) { - AbfsRestOperationException ex = intercept( - AbfsRestOperationException.class, - new Callable() { - @Override - public Void call() throws Exception { - abfsHttpService.getFilesystemProperties(fs); - return null; - } - }); - - assertEquals(FILE_SYSTEM_NOT_FOUND.getStatusCode(), ex.getStatusCode()); - } + final AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + abfsStore.deleteFilesystem(); + + AbfsRestOperationException ex = intercept( + AbfsRestOperationException.class, + new Callable() { + @Override + public Void call() throws Exception { + fs.getAbfsStore().getFilesystemProperties(); + return null; + } + }); + + assertEquals(FILE_SYSTEM_NOT_FOUND.getStatusCode(), ex.getStatusCode()); } public AzureBlobFileSystem getFileSystem() throws Exception { - final Configuration configuration = AbfsServiceProviderImpl.instance().get(ConfigurationService.class).getConfiguration(); - final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration); - return fs; + return (AzureBlobFileSystem) FileSystem.get(this.configuration); } protected NativeAzureFileSystem getWasbFileSystem() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index 4985f58..ad22f99 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -22,15 +22,12 @@ import java.io.IOException; import java.util.Arrays; import java.util.Random; - -import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl; import org.junit.Test; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; @@ -52,8 +49,6 @@ public class ITestAzureBlobFileSystemE2E extends DependencyInjectedTest { super(); Configuration configuration = this.getConfiguration(); configuration.set(ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, "0"); - this.getMockServiceInjector().replaceInstance(Configuration.class, configuration); - } @Test @@ -82,7 +77,7 @@ public class ITestAzureBlobFileSystemE2E extends DependencyInjectedTest { @Test (expected = IOException.class) public void testOOBWrites() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); - int readBufferSize = AbfsServiceProviderImpl.instance().get(ConfigurationService.class).getReadBufferSize(); + int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize(); fs.create(TEST_FILE); FSDataOutputStream writeStream = fs.create(TEST_FILE); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index 9477587..8b96c69 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -535,17 +535,16 @@ public class ITestAzureBlobFileSystemRandomRead extends DependencyInjectedTest { character = (character == 'z') ? 'a' : (char) ((int) character + 1); } - System.out.println(("Creating test file {} of size: {} " + TEST_FILE_PATH - + TEST_FILE_SIZE)); + System.out.println(String.format("Creating test file %s of size: %d ", TEST_FILE_PATH, TEST_FILE_SIZE)); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) { + try (FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) { int bytesWritten = 0; while (bytesWritten < TEST_FILE_SIZE) { outputStream.write(buffer); bytesWritten += buffer.length; } - System.out.println("Closing stream {}" + outputStream); + System.out.println(String.format("Closing stream %s", outputStream)); ContractTestUtils.NanoTimer closeTimer = new ContractTestUtils.NanoTimer(); outputStream.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java index aa30a85..29af1b8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java @@ -22,18 +22,10 @@ import java.net.URI; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; -import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl; - -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doReturn; /** * Test AzureBlobFileSystem initialization. @@ -41,18 +33,11 @@ import static org.mockito.Mockito.doReturn; public class ITestFileSystemInitialization extends DependencyInjectedTest { public ITestFileSystemInitialization() { super(); - - this.getMockServiceInjector().removeProvider(AbfsHttpService.class); - this.getMockServiceInjector().replaceInstance(AbfsHttpService.class, Mockito.mock(AbfsHttpService.class)); } @Test public void ensureAzureBlobFileSystemIsInitialized() throws Exception { - doReturn(new FileStatus(0, true, 0, 0, 0, new Path("/blah"))) - .when(AbfsServiceProviderImpl.instance().get(AbfsHttpService.class)) - .getFileStatus((AzureBlobFileSystem) anyObject(), (Path) anyObject()); - - final FileSystem fs = FileSystem.get(this.getConfiguration()); + final FileSystem fs = this.getFileSystem(); final String accountName = this.getAccountName(); final String filesystem = this.getFileSystemName(); @@ -62,16 +47,12 @@ public class ITestFileSystemInitialization extends DependencyInjectedTest { @Test public void ensureSecureAzureBlobFileSystemIsInitialized() throws Exception { - doReturn(new FileStatus(0, true, 0, 0, 0, new Path("/blah"))) - .when(AbfsServiceProviderImpl.instance().get(AbfsHttpService.class)) - .getFileStatus((AzureBlobFileSystem) anyObject(), (Path) anyObject()); - final String accountName = this.getAccountName(); final String filesystem = this.getFileSystemName(); final URI defaultUri = new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME, filesystem + "@" + accountName, null, null, null); this.getConfiguration().set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); - final FileSystem fs = FileSystem.get(this.getConfiguration()); + final FileSystem fs = this.getFileSystem(); Assert.assertEquals(fs.getUri(), new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME, filesystem + "@" + accountName, null, null, null)); Assert.assertNotNull(fs.getWorkingDirectory()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java new file mode 100644 index 0000000..62d967e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.Hashtable; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.assertEquals; + +/** + * Test FileSystemProperties. + */ +public class ITestFileSystemProperties extends DependencyInjectedTest { + private static final int TEST_DATA = 100; + private static final Path TEST_PATH = new Path("/testfile"); + public ITestFileSystemProperties() { + super(); + } + + @Test + public void testReadWriteBytesToFileAndEnsureThreadPoolCleanup() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + testWriteOneByteToFileAndEnsureThreadPoolCleanup(); + + FSDataInputStream inputStream = fs.open(TEST_PATH, 4 * 1024 * 1024); + int i = inputStream.read(); + + assertEquals(TEST_DATA, i); + } + + @Test + public void testWriteOneByteToFileAndEnsureThreadPoolCleanup() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + FSDataOutputStream stream = fs.create(TEST_PATH); + + stream.write(TEST_DATA); + stream.close(); + + FileStatus fileStatus = fs.getFileStatus(TEST_PATH); + assertEquals(1, fileStatus.getLen()); + } + + @Test + @Ignore("JDK7 doesn't support PATCH, so PUT is used. Fix is applied in latest test tenant") + public void testBase64FileSystemProperties() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + + final Hashtable properties = new Hashtable<>(); + properties.put("key", "{ value: value }"); + fs.getAbfsStore().setFilesystemProperties(properties); + Hashtable fetchedProperties = fs.getAbfsStore().getFilesystemProperties(); + + Assert.assertEquals(properties, fetchedProperties); + } + + @Test + public void testBase64PathProperties() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final Hashtable properties = new Hashtable<>(); + properties.put("key", "{ value: valueTest }"); + fs.create(TEST_PATH); + fs.getAbfsStore().setPathProperties(TEST_PATH, properties); + Hashtable fetchedProperties = + fs.getAbfsStore().getPathProperties(TEST_PATH); + + Assert.assertEquals(properties, fetchedProperties); + } + + @Test (expected = Exception.class) + public void testBase64InvalidFileSystemProperties() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final Hashtable properties = new Hashtable<>(); + properties.put("key", "{ value: valueæ­² }"); + fs.getAbfsStore().setFilesystemProperties(properties); + Hashtable fetchedProperties = fs.getAbfsStore().getFilesystemProperties(); + + Assert.assertEquals(properties, fetchedProperties); + } + + @Test (expected = Exception.class) + public void testBase64InvalidPathProperties() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final Hashtable properties = new Hashtable<>(); + properties.put("key", "{ value: valueTestå…© }"); + fs.create(TEST_PATH); + fs.getAbfsStore().setPathProperties(TEST_PATH, properties); + Hashtable fetchedProperties = fs.getAbfsStore().getPathProperties(TEST_PATH); + + Assert.assertEquals(properties, fetchedProperties); + } + + @Test + public void testSetFileSystemProperties() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final Hashtable properties = new Hashtable<>(); + properties.put("containerForDevTest", "true"); + fs.getAbfsStore().setFilesystemProperties(properties); + Hashtable fetchedProperties = fs.getAbfsStore().getFilesystemProperties(); + + Assert.assertEquals(properties, fetchedProperties); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java index a55599b..ef61e52 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java @@ -20,22 +20,14 @@ package org.apache.hadoop.fs.azurebfs; import java.net.URI; -import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; - -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doReturn; /** * Test AzureBlobFileSystem registration. @@ -43,17 +35,10 @@ import static org.mockito.Mockito.doReturn; public class ITestFileSystemRegistration extends DependencyInjectedTest { public ITestFileSystemRegistration() throws Exception { super(); - - this.getMockServiceInjector().removeProvider(AbfsHttpService.class); - this.getMockServiceInjector().replaceInstance(AbfsHttpService.class, Mockito.mock(AbfsHttpService.class)); } @Test public void ensureAzureBlobFileSystemIsDefaultFileSystem() throws Exception { - doReturn(new FileStatus(0, true, 0, 0, 0, new Path("/blah"))) - .when(AbfsServiceProviderImpl.instance().get(AbfsHttpService.class)) - .getFileStatus((AzureBlobFileSystem) anyObject(), (Path) anyObject()); - FileSystem fs = FileSystem.get(this.getConfiguration()); Assert.assertTrue(fs instanceof AzureBlobFileSystem); @@ -63,14 +48,10 @@ public class ITestFileSystemRegistration extends DependencyInjectedTest { @Test public void ensureSecureAzureBlobFileSystemIsDefaultFileSystem() throws Exception { - doReturn(new FileStatus(0, true, 0, 0, 0, new Path("/blah"))) - .when(AbfsServiceProviderImpl.instance().get(AbfsHttpService.class)) - .getFileStatus((AzureBlobFileSystem) anyObject(), (Path) anyObject()); - final String accountName = this.getAccountName(); - final String filesystem = this.getFileSystemName(); + final String fileSystemName = this.getFileSystemName(); - final URI defaultUri = new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME, filesystem + "@" + accountName, null, null, null); + final URI defaultUri = new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME, fileSystemName + "@" + accountName, null, null, null); this.getConfiguration().set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); FileSystem fs = FileSystem.get(this.getConfiguration()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org