From common-commits-return-86820-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Sat Aug 11 07:37:23 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3F6F2180662 for ; Sat, 11 Aug 2018 07:37:21 +0200 (CEST) Received: (qmail 79321 invoked by uid 500); 11 Aug 2018 05:37:08 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 76850 invoked by uid 99); 11 Aug 2018 05:37:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Aug 2018 05:37:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B70BAE005E; Sat, 11 Aug 2018 05:37:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tmarquardt@apache.org To: common-commits@hadoop.apache.org Date: Sat, 11 Aug 2018 05:37:46 -0000 Message-Id: <3273cb4f372144d39c4a96588e264e25@git.apache.org> In-Reply-To: <7a1eb7a94bf246f3afed47a6e76b85a2@git.apache.org> References: <7a1eb7a94bf246f3afed47a6e76b85a2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [42/50] [abbrv] hadoop git commit: HADOOP-15407. HADOOP-15540. Support Windows Azure Storage - Blob file system "ABFS" in Hadoop: Core Commit. http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java new file mode 100644 index 0000000..dd59892 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java @@ -0,0 +1,507 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URLDecoder; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.Charsets; +/** + * Represents the shared key credentials used to access an Azure Storage + * account. + */ +public class SharedKeyCredentials { + private static final int EXPECTED_BLOB_QUEUE_CANONICALIZED_STRING_LENGTH = 300; + private static final Pattern CRLF = Pattern.compile("\r\n", Pattern.LITERAL); + private static final String HMAC_SHA256 = "HmacSHA256"; + private static final Base64 BASE_64 = new Base64(); + + /** + * Stores a reference to the RFC1123 date/time pattern. + */ + private static final String RFC1123_PATTERN = "EEE, dd MMM yyyy HH:mm:ss z"; + + + private String accountName; + private byte[] accountKey; + private Mac hmacSha256; + + public SharedKeyCredentials(final String accountName, + final String accountKey) { + if (accountName == null || accountName.isEmpty()) { + throw new IllegalArgumentException("Invalid account name."); + } + if (accountKey == null || accountKey.isEmpty()) { + throw new IllegalArgumentException("Invalid account key."); + } + this.accountName = accountName; + this.accountKey = BASE_64.decode(accountKey); + initializeMac(); + } + + public void signRequest(HttpURLConnection connection, final long contentLength) throws UnsupportedEncodingException { + + connection.setRequestProperty(HttpHeaderConfigurations.X_MS_DATE, getGMTTime()); + + final String stringToSign = canonicalize(connection, accountName, contentLength); + + final String computedBase64Signature = computeHmac256(stringToSign); + + connection.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + String.format("%s %s:%s", "SharedKey", accountName, computedBase64Signature)); + } + + private String computeHmac256(final String stringToSign) { + byte[] utf8Bytes = null; + try { + utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8); + } catch (final UnsupportedEncodingException e) { + throw new IllegalArgumentException(e); + } + byte[] hmac; + synchronized (this) { + hmac = hmacSha256.doFinal(utf8Bytes); + } + return new String(BASE_64.encode(hmac), Charsets.UTF_8); + } + + /** + * Add x-ms- prefixed headers in a fixed order. + * + * @param conn the HttpURLConnection for the operation + * @param canonicalizedString the canonicalized string to add the canonicalized headerst to. + */ + private static void addCanonicalizedHeaders(final HttpURLConnection conn, final StringBuilder canonicalizedString) { + // Look for header names that start with + // HeaderNames.PrefixForStorageHeader + // Then sort them in case-insensitive manner. + + final Map> headers = conn.getRequestProperties(); + final ArrayList httpStorageHeaderNameArray = new ArrayList(); + + for (final String key : headers.keySet()) { + if (key.toLowerCase(Locale.ROOT).startsWith(AbfsHttpConstants.HTTP_HEADER_PREFIX)) { + httpStorageHeaderNameArray.add(key.toLowerCase(Locale.ROOT)); + } + } + + Collections.sort(httpStorageHeaderNameArray); + + // Now go through each header's values in the sorted order and append + // them to the canonicalized string. + for (final String key : httpStorageHeaderNameArray) { + final StringBuilder canonicalizedElement = new StringBuilder(key); + String delimiter = ":"; + final ArrayList values = getHeaderValues(headers, key); + + boolean appendCanonicalizedElement = false; + // Go through values, unfold them, and then append them to the + // canonicalized element string. + for (final String value : values) { + if (value != null) { + appendCanonicalizedElement = true; + } + + // Unfolding is simply removal of CRLF. + final String unfoldedValue = CRLF.matcher(value) + .replaceAll(Matcher.quoteReplacement("")); + + // Append it to the canonicalized element string. + canonicalizedElement.append(delimiter); + canonicalizedElement.append(unfoldedValue); + delimiter = ","; + } + + // Now, add this canonicalized element to the canonicalized header + // string. + if (appendCanonicalizedElement) { + appendCanonicalizedElement(canonicalizedString, canonicalizedElement.toString()); + } + } + } + + /** + * Initialie the HmacSha256 associated with the account key. + */ + private void initializeMac() { + // Initializes the HMAC-SHA256 Mac and SecretKey. + try { + hmacSha256 = Mac.getInstance(HMAC_SHA256); + hmacSha256.init(new SecretKeySpec(accountKey, HMAC_SHA256)); + } catch (final Exception e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Append a string to a string builder with a newline constant + * + * @param builder the StringBuilder object + * @param element the string to append. + */ + private static void appendCanonicalizedElement(final StringBuilder builder, final String element) { + builder.append("\n"); + builder.append(element); + } + + /** + * Constructs a canonicalized string from the request's headers that will be used to construct the signature string + * for signing a Blob or Queue service request under the Shared Key Full authentication scheme. + * + * @param address the request URI + * @param accountName the account name associated with the request + * @param method the verb to be used for the HTTP request. + * @param contentType the content type of the HTTP request. + * @param contentLength the length of the content written to the outputstream in bytes, -1 if unknown + * @param date the date/time specification for the HTTP request + * @param conn the HttpURLConnection for the operation. + * @return A canonicalized string. + */ + private static String canonicalizeHttpRequest(final java.net.URL address, final String accountName, + final String method, final String contentType, final long contentLength, final String date, + final HttpURLConnection conn) throws UnsupportedEncodingException { + + // The first element should be the Method of the request. + // I.e. GET, POST, PUT, or HEAD. + final StringBuilder canonicalizedString = new StringBuilder(EXPECTED_BLOB_QUEUE_CANONICALIZED_STRING_LENGTH); + canonicalizedString.append(conn.getRequestMethod()); + + // The next elements are + // If any element is missing it may be empty. + appendCanonicalizedElement(canonicalizedString, + getHeaderValue(conn, HttpHeaderConfigurations.CONTENT_ENCODING, AbfsHttpConstants.EMPTY_STRING)); + appendCanonicalizedElement(canonicalizedString, + getHeaderValue(conn, HttpHeaderConfigurations.CONTENT_LANGUAGE, AbfsHttpConstants.EMPTY_STRING)); + appendCanonicalizedElement(canonicalizedString, + contentLength <= 0 ? "" : String.valueOf(contentLength)); + appendCanonicalizedElement(canonicalizedString, + getHeaderValue(conn, HttpHeaderConfigurations.CONTENT_MD5, AbfsHttpConstants.EMPTY_STRING)); + appendCanonicalizedElement(canonicalizedString, contentType != null ? contentType : AbfsHttpConstants.EMPTY_STRING); + + final String dateString = getHeaderValue(conn, HttpHeaderConfigurations.X_MS_DATE, AbfsHttpConstants.EMPTY_STRING); + // If x-ms-date header exists, Date should be empty string + appendCanonicalizedElement(canonicalizedString, dateString.equals(AbfsHttpConstants.EMPTY_STRING) ? date + : ""); + + appendCanonicalizedElement(canonicalizedString, + getHeaderValue(conn, HttpHeaderConfigurations.IF_MODIFIED_SINCE, AbfsHttpConstants.EMPTY_STRING)); + appendCanonicalizedElement(canonicalizedString, + getHeaderValue(conn, HttpHeaderConfigurations.IF_MATCH, AbfsHttpConstants.EMPTY_STRING)); + appendCanonicalizedElement(canonicalizedString, + getHeaderValue(conn, HttpHeaderConfigurations.IF_NONE_MATCH, AbfsHttpConstants.EMPTY_STRING)); + appendCanonicalizedElement(canonicalizedString, + getHeaderValue(conn, HttpHeaderConfigurations.IF_UNMODIFIED_SINCE, AbfsHttpConstants.EMPTY_STRING)); + appendCanonicalizedElement(canonicalizedString, + getHeaderValue(conn, HttpHeaderConfigurations.RANGE, AbfsHttpConstants.EMPTY_STRING)); + + addCanonicalizedHeaders(conn, canonicalizedString); + + appendCanonicalizedElement(canonicalizedString, getCanonicalizedResource(address, accountName)); + + return canonicalizedString.toString(); + } + + /** + * Gets the canonicalized resource string for a Blob or Queue service request under the Shared Key Lite + * authentication scheme. + * + * @param address the resource URI. + * @param accountName the account name for the request. + * @return the canonicalized resource string. + */ + private static String getCanonicalizedResource(final java.net.URL address, final String accountName) throws UnsupportedEncodingException { + // Resource path + final StringBuilder resourcepath = new StringBuilder(AbfsHttpConstants.FORWARD_SLASH); + resourcepath.append(accountName); + + // Note that AbsolutePath starts with a '/'. + resourcepath.append(address.getPath()); + final StringBuilder canonicalizedResource = new StringBuilder(resourcepath.toString()); + + // query parameters + if (address.getQuery() == null || !address.getQuery().contains(AbfsHttpConstants.EQUAL)) { + //no query params. + return canonicalizedResource.toString(); + } + + final Map queryVariables = parseQueryString(address.getQuery()); + + final Map lowercasedKeyNameValue = new HashMap(); + + for (final Entry entry : queryVariables.entrySet()) { + // sort the value and organize it as comma separated values + final List sortedValues = Arrays.asList(entry.getValue()); + Collections.sort(sortedValues); + + final StringBuilder stringValue = new StringBuilder(); + + for (final String value : sortedValues) { + if (stringValue.length() > 0) { + stringValue.append(AbfsHttpConstants.COMMA); + } + + stringValue.append(value); + } + + // key turns out to be null for ?a&b&c&d + lowercasedKeyNameValue.put((entry.getKey()) == null ? null + : entry.getKey().toLowerCase(Locale.ROOT), stringValue.toString()); + } + + final ArrayList sortedKeys = new ArrayList(lowercasedKeyNameValue.keySet()); + + Collections.sort(sortedKeys); + + for (final String key : sortedKeys) { + final StringBuilder queryParamString = new StringBuilder(); + + queryParamString.append(key); + queryParamString.append(":"); + queryParamString.append(lowercasedKeyNameValue.get(key)); + + appendCanonicalizedElement(canonicalizedResource, queryParamString.toString()); + } + + return canonicalizedResource.toString(); + } + + /** + * Gets all the values for the given header in the one to many map, performs a trimStart() on each return value + * + * @param headers a one to many map of key / values representing the header values for the connection. + * @param headerName the name of the header to lookup + * @return an ArrayList of all trimmed values corresponding to the requested headerName. This may be empty + * if the header is not found. + */ + private static ArrayList getHeaderValues(final Map> headers, final String headerName) { + + final ArrayList arrayOfValues = new ArrayList(); + List values = null; + + for (final Entry> entry : headers.entrySet()) { + if (entry.getKey().toLowerCase(Locale.ROOT).equals(headerName)) { + values = entry.getValue(); + break; + } + } + if (values != null) { + for (final String value : values) { + // canonicalization formula requires the string to be left + // trimmed. + arrayOfValues.add(trimStart(value)); + } + } + return arrayOfValues; + } + + /** + * Parses a query string into a one to many hashmap. + * + * @param parseString the string to parse + * @return a HashMap of the key values. + */ + private static HashMap parseQueryString(String parseString) throws UnsupportedEncodingException { + final HashMap retVals = new HashMap(); + if (parseString == null || parseString.isEmpty()) { + return retVals; + } + + // 1. Remove ? if present + final int queryDex = parseString.indexOf(AbfsHttpConstants.QUESTION_MARK); + if (queryDex >= 0 && parseString.length() > 0) { + parseString = parseString.substring(queryDex + 1); + } + + // 2. split name value pairs by splitting on the 'c&' character + final String[] valuePairs = parseString.contains(AbfsHttpConstants.AND_MARK) + ? parseString.split(AbfsHttpConstants.AND_MARK) + : parseString.split(AbfsHttpConstants.SEMICOLON); + + // 3. for each field value pair parse into appropriate map entries + for (int m = 0; m < valuePairs.length; m++) { + final int equalDex = valuePairs[m].indexOf(AbfsHttpConstants.EQUAL); + + if (equalDex < 0 || equalDex == valuePairs[m].length() - 1) { + continue; + } + + String key = valuePairs[m].substring(0, equalDex); + String value = valuePairs[m].substring(equalDex + 1); + + key = safeDecode(key); + value = safeDecode(value); + + // 3.1 add to map + String[] values = retVals.get(key); + + if (values == null) { + values = new String[]{value}; + if (!value.equals("")) { + retVals.put(key, values); + } + } + } + + return retVals; + } + + /** + * Performs safe decoding of the specified string, taking care to preserve each + character, rather + * than replacing it with a space character. + * + * @param stringToDecode A String that represents the string to decode. + * @return A String that represents the decoded string. + *

+ * If a storage service error occurred. + */ + private static String safeDecode(final String stringToDecode) throws UnsupportedEncodingException { + if (stringToDecode == null) { + return null; + } + + if (stringToDecode.length() == 0) { + return ""; + } + + if (stringToDecode.contains(AbfsHttpConstants.PLUS)) { + final StringBuilder outBuilder = new StringBuilder(); + + int startDex = 0; + for (int m = 0; m < stringToDecode.length(); m++) { + if (stringToDecode.charAt(m) == '+') { + if (m > startDex) { + outBuilder.append(URLDecoder.decode(stringToDecode.substring(startDex, m), + AbfsHttpConstants.UTF_8)); + } + + outBuilder.append(AbfsHttpConstants.PLUS); + startDex = m + 1; + } + } + + if (startDex != stringToDecode.length()) { + outBuilder.append(URLDecoder.decode(stringToDecode.substring(startDex, stringToDecode.length()), + AbfsHttpConstants.UTF_8)); + } + + return outBuilder.toString(); + } else { + return URLDecoder.decode(stringToDecode, AbfsHttpConstants.UTF_8); + } + } + + private static String trimStart(final String value) { + int spaceDex = 0; + while (spaceDex < value.length() && value.charAt(spaceDex) == ' ') { + spaceDex++; + } + + return value.substring(spaceDex); + } + + private static String getHeaderValue(final HttpURLConnection conn, final String headerName, final String defaultValue) { + final String headerValue = conn.getRequestProperty(headerName); + return headerValue == null ? defaultValue : headerValue; + } + + + /** + * Constructs a canonicalized string for signing a request. + * + * @param conn the HttpURLConnection to canonicalize + * @param accountName the account name associated with the request + * @param contentLength the length of the content written to the outputstream in bytes, + * -1 if unknown + * @return a canonicalized string. + */ + private String canonicalize(final HttpURLConnection conn, + final String accountName, + final Long contentLength) throws UnsupportedEncodingException { + + if (contentLength < -1) { + throw new IllegalArgumentException( + "The Content-Length header must be greater than or equal to -1."); + } + + String contentType = getHeaderValue(conn, HttpHeaderConfigurations.CONTENT_TYPE, ""); + + return canonicalizeHttpRequest(conn.getURL(), accountName, + conn.getRequestMethod(), contentType, contentLength, null, conn); + } + + /** + * Thread local for storing GMT date format. + */ + private static ThreadLocal rfc1123GmtDateTimeFormatter + = new ThreadLocal() { + @Override + protected DateFormat initialValue() { + final DateFormat formatter = new SimpleDateFormat(RFC1123_PATTERN, Locale.ROOT); + formatter.setTimeZone(GMT_ZONE); + return formatter; + } + }; + + public static final TimeZone GMT_ZONE = TimeZone.getTimeZone(AbfsHttpConstants.GMT_TIMEZONE); + + + /** + * Returns the current GMT date/time String using the RFC1123 pattern. + * + * @return A String that represents the current GMT date/time using the RFC1123 pattern. + */ + static String getGMTTime() { + return getGMTTime(new Date()); + } + + /** + * Returns the GTM date/time String for the specified value using the RFC1123 pattern. + * + * @param date + * A Date object that represents the date to convert to GMT date/time in the RFC1123 + * pattern. + * + * @return A String that represents the GMT date/time for the specified value using the RFC1123 + * pattern. + */ + static String getGMTTime(final Date date) { + return rfc1123GmtDateTimeFormatter.get().format(date); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java new file mode 100644 index 0000000..57b6463 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Objects; + +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Singleton +@InterfaceAudience.Private +@InterfaceStability.Evolving +final class TracingServiceImpl implements TracingService { + private static final Logger LOG = LoggerFactory.getLogger(TracingService.class); + + private final Tracer tracer; + private final ThreadLocal currentScopeId; + + @Inject + TracingServiceImpl( + final Configuration configuration) { + Preconditions.checkNotNull(configuration, "configuration"); + + this.currentScopeId = new ThreadLocal<>(); + + this.tracer = new Tracer.Builder(TracingService.class.getSimpleName()). + conf(new HTraceConfiguration() { + @Override + public String get(String key) { + if (Objects.equals(key, Tracer.SPAN_RECEIVER_CLASSES_KEY)) { + return LoggerSpanReceiver.class.getName(); + } + return null; + } + + @Override + public String get(String key, String defaultValue) { + String value = get(key); + if (value != null) { + return value; + } + return defaultValue; + } + }). + build(); + + this.tracer.addSampler(Sampler.ALWAYS); + } + + @Override + public TraceScope traceBegin(String description) { + if (this.LOG.isTraceEnabled()) { + TraceScope traceScope = this.tracer.newScope(description); + this.currentScopeId.set(traceScope.getSpanId()); + return traceScope; + } + + return null; + } + + @Override + public TraceScope traceBegin(String description, SpanId parentSpanId) { + if (this.LOG.isTraceEnabled()) { + TraceScope traceScope = this.tracer.newScope(description, parentSpanId); + this.currentScopeId.set(traceScope.getSpanId()); + return traceScope; + } + + return null; + } + + @Override + public void traceException(TraceScope traceScope, AzureBlobFileSystemException azureBlobFileSystemException) { + if (this.LOG.isTraceEnabled()) { + Preconditions.checkNotNull(traceScope, "traceScope"); + Preconditions.checkNotNull(azureBlobFileSystemException, "azureBlobFileSystemException"); + + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + azureBlobFileSystemException.printStackTrace(printWriter); + printWriter.flush(); + + traceScope.addKVAnnotation("Exception", stringWriter.toString()); + } + } + + @Override + public SpanId getCurrentTraceScopeSpanId() { + return this.currentScopeId.get(); + } + + @Override + public void traceEnd(TraceScope traceScope) { + if (this.LOG.isTraceEnabled()) { + Preconditions.checkNotNull(traceScope, "traceScope"); + + SpanId[] parents = traceScope.getSpan().getParents(); + this.currentScopeId.set(parents != null && parents.length > 0 ? parents[parents.length - 1] : null); + traceScope.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/package-info.java new file mode 100644 index 0000000..97c1d71 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.fs.azurebfs.services; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java new file mode 100644 index 0000000..7652adf --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.utils; + +import java.util.regex.Pattern; + +/** + * Utility class to help with Abfs url transformation to blob urls. + */ +public final class UriUtils { + private static final String ABFS_URI_REGEX = "[^.]+\\.dfs\\.(preprod\\.){0,1}core\\.windows\\.net"; + private static final Pattern ABFS_URI_PATTERN = Pattern.compile(ABFS_URI_REGEX); + + /** + * Checks whether a string includes abfs url. + * @param string the string to check. + * @return true if string has abfs url. + */ + public static boolean containsAbfsUrl(final String string) { + if (string == null || string.isEmpty()) { + return false; + } + + return ABFS_URI_PATTERN.matcher(string).matches(); + } + + /** + * Extracts the raw account name from account name. + * @param accountName to extract the raw account name. + * @return extracted raw account name. + */ + public static String extractRawAccountFromAccountName(final String accountName) { + if (accountName == null || accountName.isEmpty()) { + return null; + } + + if (!containsAbfsUrl(accountName)) { + return null; + } + + String[] splitByDot = accountName.split("\\."); + if (splitByDot.length == 0) { + return null; + } + + return splitByDot[0]; + } + + /** + * Generate unique test path for multiple user tests. + * + * @return root test path + */ + public static String generateUniqueTestPath() { + String testUniqueForkId = System.getProperty("test.unique.fork.id"); + return testUniqueForkId == null ? "/test" : "/" + testUniqueForkId + "/test"; + } + + private UriUtils() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/package-info.java new file mode 100644 index 0000000..d8cc940 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.fs.azurebfs.utils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java new file mode 100644 index 0000000..5ec1e2e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.net.URI; +import java.util.UUID; +import java.util.concurrent.Callable; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.mockito.internal.util.MockUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; +import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; +import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; +import org.apache.hadoop.fs.azurebfs.services.MockAbfsHttpClientFactoryImpl; +import org.apache.hadoop.fs.azurebfs.services.MockAbfsServiceInjectorImpl; +import org.apache.hadoop.fs.azurebfs.services.MockServiceProviderImpl; + +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeNotNull; + +/** + * Provide dependencies for AzureBlobFileSystem tests. + */ +public abstract class DependencyInjectedTest { + private final MockAbfsServiceInjectorImpl mockServiceInjector; + private final boolean isEmulator; + private NativeAzureFileSystem wasb; + private String abfsScheme; + + private Configuration configuration; + private String fileSystemName; + private String accountName; + private String testUrl; + + public DependencyInjectedTest(final boolean secure) { + this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME); + } + + public MockAbfsServiceInjectorImpl getMockServiceInjector() { + return this.mockServiceInjector; + } + + protected DependencyInjectedTest() { + this(FileSystemUriSchemes.ABFS_SCHEME); + } + + private DependencyInjectedTest(final String scheme) { + abfsScheme = scheme; + fileSystemName = UUID.randomUUID().toString(); + configuration = new Configuration(); + configuration.addResource("azure-bfs-test.xml"); + + assumeNotNull(configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME)); + assumeNotNull(configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + configuration.get(TestConfigurationKeys + .FS_AZURE_TEST_ACCOUNT_NAME))); + + final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName(); + URI defaultUri = null; + + try { + defaultUri = new URI(abfsScheme, abfsUrl, null, null, null); + } catch (Exception ex) { + Assert.fail(ex.getMessage()); + } + + this.testUrl = defaultUri.toString(); + configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); + configuration.setBoolean(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); + this.mockServiceInjector = new MockAbfsServiceInjectorImpl(configuration); + this.isEmulator = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); + this.accountName = this.configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME); + } + + @Before + public void initialize() throws Exception { + if (this.isEmulator) { + this.mockServiceInjector.replaceProvider(AbfsHttpClientFactory.class, MockAbfsHttpClientFactoryImpl.class); + } + + MockServiceProviderImpl.create(this.mockServiceInjector); + + if (!this.isEmulator) { + final URI wasbUri = new URI(abfsUrlToWasbUrl(this.getTestUrl())); + final AzureNativeFileSystemStore azureNativeFileSystemStore = new AzureNativeFileSystemStore(); + azureNativeFileSystemStore.initialize( + wasbUri, + this.getConfiguration(), + new AzureFileSystemInstrumentation(this.getConfiguration())); + + this.wasb = new NativeAzureFileSystem(azureNativeFileSystemStore); + this.wasb.initialize(wasbUri, configuration); + } + } + + @After + public void testCleanup() throws Exception { + if (this.wasb != null) { + this.wasb.close(); + } + + FileSystem.closeAll(); + + final AzureBlobFileSystem fs = this.getFileSystem(); + final AbfsHttpService abfsHttpService = AbfsServiceProviderImpl.instance().get(AbfsHttpService.class); + abfsHttpService.deleteFilesystem(fs); + + if (!(new MockUtil().isMock(abfsHttpService))) { + AbfsRestOperationException ex = intercept( + AbfsRestOperationException.class, + new Callable() { + @Override + public Void call() throws Exception { + abfsHttpService.getFilesystemProperties(fs); + return null; + } + }); + + assertEquals(FILE_SYSTEM_NOT_FOUND.getStatusCode(), ex.getStatusCode()); + } + } + + public AzureBlobFileSystem getFileSystem() throws Exception { + final Configuration configuration = AbfsServiceProviderImpl.instance().get(ConfigurationService.class).getConfiguration(); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration); + return fs; + } + + protected NativeAzureFileSystem getWasbFileSystem() { + return this.wasb; + } + + protected String getHostName() { + return configuration.get(TestConfigurationKeys.FS_AZURE_TEST_HOST_NAME); + } + + protected void updateTestUrl(String testUrl) { + this.testUrl = testUrl; + } + protected String getTestUrl() { + return testUrl; + } + + protected void updateFileSystemName(String fileSystemName) { + this.fileSystemName = fileSystemName; + } + protected String getFileSystemName() { + return fileSystemName; + } + + protected String getAccountName() { + return configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME); + } + + protected String getAccountKey() { + return configuration.get( + TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + + getAccountName()); + } + + protected Configuration getConfiguration() { + return this.configuration; + } + + protected boolean isEmulator() { + return isEmulator; + } + + protected static String wasbUrlToAbfsUrl(final String wasbUrl) { + return convertTestUrls( + wasbUrl, FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX, + FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX); + } + + protected static String abfsUrlToWasbUrl(final String abfsUrl) { + return convertTestUrls( + abfsUrl, FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX, + FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX); + } + + private static String convertTestUrls( + final String url, final String fromNonSecureScheme, final String fromSecureScheme, final String fromDnsPrefix, + final String toNonSecureScheme, final String toSecureScheme, final String toDnsPrefix) { + String data = null; + if (url.startsWith(fromNonSecureScheme + "://")) { + data = url.replace(fromNonSecureScheme + "://", toNonSecureScheme + "://"); + } else if (url.startsWith(fromSecureScheme + "://")) { + data = url.replace(fromSecureScheme + "://", toSecureScheme + "://"); + } + + data = data.replace("." + fromDnsPrefix + ".", "." + toDnsPrefix + "."); + return data; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java new file mode 100644 index 0000000..10d42d1 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.FileNotFoundException; +import java.util.Random; + +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.assertEquals; + +/** + * Test append operations. + */ +public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest { + private static final Path TEST_FILE_PATH = new Path("testfile"); + private static final Path TEST_FOLDER_PATH = new Path("testFolder"); + public ITestAzureBlobFileSystemAppend() { + super(); + } + + @Test(expected = FileNotFoundException.class) + public void testAppendDirShouldFail() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final Path filePath = TEST_FILE_PATH; + fs.mkdirs(filePath); + fs.append(filePath, 0); + } + + @Test + public void testAppendWithLength0() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + FSDataOutputStream stream = fs.create(TEST_FILE_PATH); + final byte[] b = new byte[1024]; + new Random().nextBytes(b); + stream.write(b, 1000, 0); + + assertEquals(0, stream.getPos()); + } + + + @Test(expected = FileNotFoundException.class) + public void testAppendFileAfterDelete() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final Path filePath = TEST_FILE_PATH; + fs.create(filePath); + fs.delete(filePath, false); + + fs.append(filePath); + } + + @Test(expected = FileNotFoundException.class) + public void testAppendDirectory() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final Path folderPath = TEST_FOLDER_PATH; + fs.mkdirs(folderPath); + fs.append(folderPath); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java new file mode 100644 index 0000000..d107c9d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.junit.Test; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test AzureBlobFileSystem back compatibility with WASB. + */ +public class ITestAzureBlobFileSystemBackCompat extends DependencyInjectedTest { + public ITestAzureBlobFileSystemBackCompat() { + super(); + } + + @Test + public void testBlobBackCompat() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + String storageConnectionString = getBlobConnectionString(); + CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); + CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + CloudBlobContainer container = blobClient.getContainerReference(this.getFileSystemName()); + container.createIfNotExists(); + + CloudBlockBlob blockBlob = container.getBlockBlobReference("test/10/10/10"); + blockBlob.uploadText(""); + + blockBlob = container.getBlockBlobReference("test/10/123/3/2/1/3"); + blockBlob.uploadText(""); + + FileStatus[] fileStatuses = fs.listStatus(new Path("/test/10/")); + assertEquals(fileStatuses.length, 2); + assertEquals(fileStatuses[0].getPath().getName(), "10"); + assertTrue(fileStatuses[0].isDirectory()); + assertEquals(fileStatuses[0].getLen(), 0); + assertEquals(fileStatuses[1].getPath().getName(), "123"); + assertTrue(fileStatuses[1].isDirectory()); + assertEquals(fileStatuses[1].getLen(), 0); + } + + private String getBlobConnectionString() { + String connectionString; + if (isEmulator()) { + connectionString = "DefaultEndpointsProtocol=http;BlobEndpoint=http://" + + this.getHostName() + ":8880/" + this.getAccountName().split("\\.") [0] + + ";AccountName=" + this.getAccountName().split("\\.")[0] + + ";AccountKey=" + this.getAccountKey(); + } + else { + connectionString = "DefaultEndpointsProtocol=http;BlobEndpoint=http://" + + this.getAccountName().replaceFirst("\\.dfs\\.", ".blob.") + + ";AccountName=" + this.getAccountName().split("\\.")[0] + + ";AccountKey=" + this.getAccountKey(); + } + + return connectionString; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCopy.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCopy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCopy.java new file mode 100644 index 0000000..c158e03 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCopy.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test copy operation. + */ +public class ITestAzureBlobFileSystemCopy extends DependencyInjectedTest { + public ITestAzureBlobFileSystemCopy() { + super(); + } + + @Test + public void testCopyFromLocalFileSystem() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + Path localFilePath = new Path(System.getProperty("test.build.data", + "azure_test")); + FileSystem localFs = FileSystem.get(new Configuration()); + localFs.delete(localFilePath, true); + try { + writeString(localFs, localFilePath, "Testing"); + Path dstPath = new Path("copiedFromLocal"); + assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false, + fs.getConf())); + assertTrue(fs.exists(dstPath)); + assertEquals("Testing", readString(fs, dstPath)); + fs.delete(dstPath, true); + } finally { + localFs.delete(localFilePath, true); + } + } + + private String readString(FileSystem fs, Path testFile) throws IOException { + FSDataInputStream inputStream = fs.open(testFile); + String ret = readString(inputStream); + inputStream.close(); + return ret; + } + + private String readString(FSDataInputStream inputStream) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader( + inputStream)); + final int bufferSize = 1024; + char[] buffer = new char[bufferSize]; + int count = reader.read(buffer, 0, bufferSize); + if (count > bufferSize) { + throw new IOException("Exceeded buffer size"); + } + inputStream.close(); + return new String(buffer, 0, count); + } + + private void writeString(FileSystem fs, Path path, String value) + throws IOException { + FSDataOutputStream outputStream = fs.create(path, true); + writeString(outputStream, value); + } + + private void writeString(FSDataOutputStream outputStream, String value) + throws IOException { + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter( + outputStream)); + writer.write(value); + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java new file mode 100644 index 0000000..c9b99e6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.FileNotFoundException; +import java.util.EnumSet; + +import org.junit.Test; + +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Test create operation. + */ +public class ITestAzureBlobFileSystemCreate extends DependencyInjectedTest { + private static final Path TEST_FILE_PATH = new Path("testfile"); + private static final Path TEST_FOLDER_PATH = new Path("testFolder"); + private static final String TEST_CHILD_FILE = "childFile"; + public ITestAzureBlobFileSystemCreate() { + super(); + } + + @Test(expected = FileAlreadyExistsException.class) + public void testCreateFileWithExistingDir() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + fs.mkdirs(TEST_FOLDER_PATH); + fs.create(TEST_FOLDER_PATH); + } + + @Test + public void testEnsureFileCreated() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + fs.create(TEST_FILE_PATH); + + FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH); + assertNotNull(fileStatus); + } + + @Test + @SuppressWarnings("deprecation") + public void testCreateNonRecursive() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE); + try { + fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null); + assertTrue("Should've thrown", false); + } catch (FileNotFoundException e) { + } + fs.mkdirs(TEST_FOLDER_PATH); + fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null) + .close(); + assertTrue(fs.exists(testFile)); + } + + @Test + @SuppressWarnings("deprecation") + public void testCreateNonRecursive1() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE); + try { + fs.createNonRecursive(testFile, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 1024, (short) 1, 1024, null); + assertTrue("Should've thrown", false); + } catch (FileNotFoundException e) { + } + fs.mkdirs(TEST_FOLDER_PATH); + fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null) + .close(); + assertTrue(fs.exists(testFile)); + } + + @Test + @SuppressWarnings("deprecation") + public void testCreateNonRecursive2() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + + Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE); + try { + fs.createNonRecursive(testFile, FsPermission.getDefault(), false, 1024, (short) 1, 1024, null); + assertTrue("Should've thrown", false); + } catch (FileNotFoundException e) { + } + fs.mkdirs(TEST_FOLDER_PATH); + fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null) + .close(); + assertTrue(fs.exists(testFile)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java new file mode 100644 index 0000000..372a087 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.Test; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.assertEquals; + +/** + * Test delete operation. + */ +public class ITestAzureBlobFileSystemDelete extends DependencyInjectedTest { + public ITestAzureBlobFileSystemDelete() { + super(); + } + + @Test + public void testDeleteRoot() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + + fs.mkdirs(new Path("/testFolder0")); + fs.mkdirs(new Path("/testFolder1")); + fs.mkdirs(new Path("/testFolder2")); + fs.create(new Path("/testFolder1/testfile")); + fs.create(new Path("/testFolder1/testfile2")); + fs.create(new Path("/testFolder1/testfile3")); + + FileStatus[] ls = fs.listStatus(new Path("/")); + assertEquals(4, ls.length); // and user dir + + fs.delete(new Path("/"), true); + ls = fs.listStatus(new Path("/")); + assertEquals(0, ls.length); + } + + @Test(expected = FileNotFoundException.class) + public void testOpenFileAfterDelete() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + fs.create(new Path("/testFile")); + fs.delete(new Path("/testFile"), false); + + fs.open(new Path("/testFile")); + } + + @Test(expected = FileNotFoundException.class) + public void testEnsureFileIsDeleted() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + fs.create(new Path("testfile")); + fs.delete(new Path("testfile"), false); + + fs.getFileStatus(new Path("testfile")); + } + + @Test(expected = FileNotFoundException.class) + public void testDeleteDirectory() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + fs.mkdirs(new Path("testfile")); + fs.mkdirs(new Path("testfile/test1")); + fs.mkdirs(new Path("testfile/test1/test2")); + + fs.delete(new Path("testfile"), true); + fs.getFileStatus(new Path("testfile")); + } + + @Test(expected = FileNotFoundException.class) + public void testDeleteFirstLevelDirectory() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final List tasks = new ArrayList<>(); + + ExecutorService es = Executors.newFixedThreadPool(10); + for (int i = 0; i < 1000; i++) { + final Path fileName = new Path("/test/" + i); + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + fs.create(fileName); + return null; + } + }; + + tasks.add(es.submit(callable)); + } + + for (Future task : tasks) { + task.get(); + } + + es.shutdownNow(); + fs.delete(new Path("/test"), true); + fs.getFileStatus(new Path("/test")); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java new file mode 100644 index 0000000..4985f58 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; + + +import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl; +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertArrayEquals; + +/** + * Test end to end between ABFS client and ABFS server. + */ +public class ITestAzureBlobFileSystemE2E extends DependencyInjectedTest { + private static final Path TEST_FILE = new Path("testfile"); + private static final int TEST_BYTE = 100; + private static final int TEST_OFFSET = 100; + private static final int TEST_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; + private static final int TEST_DEFAULT_READ_BUFFER_SIZE = 1023900; + + public ITestAzureBlobFileSystemE2E() { + super(); + Configuration configuration = this.getConfiguration(); + configuration.set(ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, "0"); + this.getMockServiceInjector().replaceInstance(Configuration.class, configuration); + + } + + @Test + public void testWriteOneByteToFile() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + FSDataOutputStream stream = fs.create(TEST_FILE); + + stream.write(TEST_BYTE); + stream.close(); + + FileStatus fileStatus = fs.getFileStatus(TEST_FILE); + assertEquals(1, fileStatus.getLen()); + } + + @Test + public void testReadWriteBytesToFile() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + testWriteOneByteToFile(); + FSDataInputStream inputStream = fs.open(TEST_FILE, TEST_DEFAULT_BUFFER_SIZE); + int i = inputStream.read(); + inputStream.close(); + + assertEquals(TEST_BYTE, i); + } + + @Test (expected = IOException.class) + public void testOOBWrites() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + int readBufferSize = AbfsServiceProviderImpl.instance().get(ConfigurationService.class).getReadBufferSize(); + + fs.create(TEST_FILE); + FSDataOutputStream writeStream = fs.create(TEST_FILE); + + byte[] bytesToRead = new byte[readBufferSize]; + final byte[] b = new byte[2 * readBufferSize]; + new Random().nextBytes(b); + + writeStream.write(b); + writeStream.flush(); + writeStream.close(); + + FSDataInputStream readStream = fs.open(TEST_FILE); + readStream.read(bytesToRead, 0, readBufferSize); + + writeStream = fs.create(TEST_FILE); + writeStream.write(b); + writeStream.flush(); + writeStream.close(); + + readStream.read(bytesToRead, 0, readBufferSize); + readStream.close(); + } + + @Test + public void testWriteWithBufferOffset() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final FSDataOutputStream stream = fs.create(TEST_FILE); + + final byte[] b = new byte[1024 * 1000]; + new Random().nextBytes(b); + stream.write(b, TEST_OFFSET, b.length - TEST_OFFSET); + stream.close(); + + final byte[] r = new byte[TEST_DEFAULT_READ_BUFFER_SIZE]; + FSDataInputStream inputStream = fs.open(TEST_FILE, TEST_DEFAULT_BUFFER_SIZE); + int result = inputStream.read(r); + + assertNotEquals(-1, result); + assertArrayEquals(r, Arrays.copyOfRange(b, TEST_OFFSET, b.length)); + + inputStream.close(); + } + + @Test + public void testReadWriteHeavyBytesToFileWithSmallerChunks() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final FSDataOutputStream stream = fs.create(TEST_FILE); + + final byte[] writeBuffer = new byte[5 * 1000 * 1024]; + new Random().nextBytes(writeBuffer); + stream.write(writeBuffer); + stream.close(); + + final byte[] readBuffer = new byte[5 * 1000 * 1024]; + FSDataInputStream inputStream = fs.open(TEST_FILE, TEST_DEFAULT_BUFFER_SIZE); + int offset = 0; + while (inputStream.read(readBuffer, offset, TEST_OFFSET) > 0) { + offset += TEST_OFFSET; + } + + assertArrayEquals(readBuffer, writeBuffer); + inputStream.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java new file mode 100644 index 0000000..616253b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Test end to end between ABFS client and ABFS server with heavy traffic. + */ +public class ITestAzureBlobFileSystemE2EScale extends DependencyInjectedTest { + private static final int TEN = 10; + private static final int ONE_THOUSAND = 1000; + private static final int BASE_SIZE = 1024; + private static final int ONE_MB = 1024 * 1024; + private static final int DEFAULT_WRITE_TIMES = 100; + private static final Path TEST_FILE = new Path("testfile"); + + public ITestAzureBlobFileSystemE2EScale() { + super(); + } + + @Test + public void testWriteHeavyBytesToFile() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final FSDataOutputStream stream = fs.create(TEST_FILE); + ExecutorService es = Executors.newFixedThreadPool(TEN); + + int testWriteBufferSize = 2 * TEN * ONE_THOUSAND * BASE_SIZE; + final byte[] b = new byte[testWriteBufferSize]; + new Random().nextBytes(b); + List> tasks = new ArrayList<>(); + + for (int i = 0; i < DEFAULT_WRITE_TIMES; i++) { + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + stream.write(b); + return null; + } + }; + + tasks.add(es.submit(callable)); + } + + for (Future task : tasks) { + task.get(); + } + + tasks.clear(); + stream.close(); + + es.shutdownNow(); + FileStatus fileStatus = fs.getFileStatus(TEST_FILE); + assertEquals(testWriteBufferSize * DEFAULT_WRITE_TIMES, fileStatus.getLen()); + } + + @Test + public void testReadWriteHeavyBytesToFile() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final FSDataOutputStream stream = fs.create(TEST_FILE); + + int testBufferSize = 5 * TEN * ONE_THOUSAND * BASE_SIZE; + final byte[] b = new byte[testBufferSize]; + new Random().nextBytes(b); + stream.write(b); + stream.close(); + + final byte[] r = new byte[testBufferSize]; + FSDataInputStream inputStream = fs.open(TEST_FILE, 4 * ONE_MB); + int result = inputStream.read(r); + inputStream.close(); + + assertNotEquals(-1, result); + assertArrayEquals(r, b); + } + + @Test + public void testReadWriteHeavyBytesToFileWithStatistics() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final FSDataOutputStream stream = fs.create(TEST_FILE); + final FileSystem.Statistics abfsStatistics = fs.getFsStatistics(); + abfsStatistics.reset(); + + int testBufferSize = 5 * TEN * ONE_THOUSAND * BASE_SIZE; + final byte[] b = new byte[testBufferSize]; + new Random().nextBytes(b); + stream.write(b); + stream.close(); + + final byte[] r = new byte[testBufferSize]; + FSDataInputStream inputStream = fs.open(TEST_FILE, 4 * ONE_MB); + inputStream.read(r); + inputStream.close(); + + Assert.assertEquals(r.length, abfsStatistics.getBytesRead()); + Assert.assertEquals(b.length, abfsStatistics.getBytesWritten()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java new file mode 100644 index 0000000..bfa662d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.junit.Test; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; + +import static org.junit.Assert.assertEquals; + +/** + * Test FileStatus. + */ +public class ITestAzureBlobFileSystemFileStatus extends DependencyInjectedTest { + private static final Path TEST_FILE = new Path("testFile"); + private static final Path TEST_FOLDER = new Path("testDir"); + public ITestAzureBlobFileSystemFileStatus() { + super(); + } + + @Test + public void testEnsureStatusWorksForRoot() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + + fs.getFileStatus(new Path("/")); + fs.listStatus(new Path("/")); + } + + @Test + public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + fs.create(TEST_FILE); + fs.mkdirs(TEST_FOLDER); + + FileStatus fileStatus = fs.getFileStatus(TEST_FILE); + assertEquals(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), fileStatus.getPermission()); + assertEquals(fs.getOwnerUser(), fileStatus.getGroup()); + assertEquals(fs.getOwnerUserPrimaryGroup(), fileStatus.getOwner()); + + fileStatus = fs.getFileStatus(TEST_FOLDER); + assertEquals(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), fileStatus.getPermission()); + assertEquals(fs.getOwnerUser(), fileStatus.getGroup()); + assertEquals(fs.getOwnerUserPrimaryGroup(), fileStatus.getOwner()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java new file mode 100644 index 0000000..8c2e8ce --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Test flush operation. + */ +public class ITestAzureBlobFileSystemFlush extends DependencyInjectedTest { + private static final int BASE_SIZE = 1024; + private static final int ONE_THOUSAND = 1000; + private static final int TEST_BUFFER_SIZE = 5 * ONE_THOUSAND * BASE_SIZE; + private static final int ONE_MB = 1024 * 1024; + private static final int FLUSH_TIMES = 200; + private static final int THREAD_SLEEP_TIME = 6000; + + private static final Path TEST_FILE_PATH = new Path("/testfile"); + + public ITestAzureBlobFileSystemFlush() { + super(); + } + + @Test + public void testAbfsOutputStreamAsyncFlushWithRetainUncommitedData() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final FSDataOutputStream stream = fs.create(TEST_FILE_PATH); + + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 2; i++) { + stream.write(b); + + for (int j = 0; j < FLUSH_TIMES; j++) { + stream.flush(); + Thread.sleep(10); + } + } + + stream.close(); + + final byte[] r = new byte[TEST_BUFFER_SIZE]; + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB); + + while (inputStream.available() != 0) { + int result = inputStream.read(r); + + assertNotEquals(-1, result); + assertArrayEquals(r, b); + } + + inputStream.close(); + } + + @Test + public void testAbfsOutputStreamSyncFlush() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final FSDataOutputStream stream = fs.create(TEST_FILE_PATH); + + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + stream.write(b); + + for (int i = 0; i < FLUSH_TIMES; i++) { + stream.hsync(); + stream.hflush(); + Thread.sleep(10); + } + stream.close(); + + final byte[] r = new byte[TEST_BUFFER_SIZE]; + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB); + int result = inputStream.read(r); + + assertNotEquals(-1, result); + assertArrayEquals(r, b); + + inputStream.close(); + } + + + @Test + public void testWriteHeavyBytesToFileSyncFlush() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + final FSDataOutputStream stream = fs.create(TEST_FILE_PATH); + final FileSystem.Statistics abfsStatistics = fs.getFsStatistics(); + abfsStatistics.reset(); + + ExecutorService es = Executors.newFixedThreadPool(10); + + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + List> tasks = new ArrayList<>(); + for (int i = 0; i < FLUSH_TIMES; i++) { + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + stream.write(b); + return null; + } + }; + + tasks.add(es.submit(callable)); + } + + boolean shouldStop = false; + while (!shouldStop) { + shouldStop = true; + for (Future task : tasks) { + if (!task.isDone()) { + stream.hsync(); + shouldStop = false; + Thread.sleep(THREAD_SLEEP_TIME); + } + } + } + + tasks.clear(); + stream.close(); + + es.shutdownNow(); + FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH); + assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen()); + assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, abfsStatistics.getBytesWritten()); + } + + @Test + public void testWriteHeavyBytesToFileAsyncFlush() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + fs.create(TEST_FILE_PATH); + final FSDataOutputStream stream = fs.create(TEST_FILE_PATH); + ExecutorService es = Executors.newFixedThreadPool(10); + + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + List> tasks = new ArrayList<>(); + for (int i = 0; i < FLUSH_TIMES; i++) { + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + stream.write(b); + return null; + } + }; + + tasks.add(es.submit(callable)); + } + + boolean shouldStop = false; + while (!shouldStop) { + shouldStop = true; + for (Future task : tasks) { + if (!task.isDone()) { + stream.flush(); + shouldStop = false; + } + } + } + Thread.sleep(THREAD_SLEEP_TIME); + tasks.clear(); + stream.close(); + + es.shutdownNow(); + FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH); + assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java new file mode 100644 index 0000000..d2ed400 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.FileNotFoundException; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; + +/** + * Test filesystem initialization and creation. + */ +public class ITestAzureBlobFileSystemInitAndCreate extends DependencyInjectedTest { + public ITestAzureBlobFileSystemInitAndCreate() { + super(); + + this.getConfiguration().unset(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION); + } + + @Override + public void initialize() { + } + + @Override + public void testCleanup() { + } + + @Test (expected = FileNotFoundException.class) + public void ensureFilesystemWillNotBeCreatedIfCreationConfigIsNotSet() throws Exception { + super.initialize(); + this.getFileSystem(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org