Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4BEAD200D51 for ; Wed, 22 Nov 2017 18:20:55 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4A3AF160C0F; Wed, 22 Nov 2017 17:20:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1B209160C10 for ; Wed, 22 Nov 2017 18:20:52 +0100 (CET) Received: (qmail 53592 invoked by uid 500); 22 Nov 2017 17:20:51 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 53257 invoked by uid 99); 22 Nov 2017 17:20:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Nov 2017 17:20:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E6C8FF5C3D; Wed, 22 Nov 2017 17:20:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stevel@apache.org To: common-commits@hadoop.apache.org Date: Wed, 22 Nov 2017 17:20:54 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/14] hadoop git commit: HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints. Contributed by Steve Loughran and Ryan Blue. archived-at: Wed, 22 Nov 2017 17:20:55 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java new file mode 100644 index 0000000..55e3e37 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.net.URI; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase; +import org.apache.hadoop.util.Progressable; + +/** + * Relays FS calls to the mocked FS, allows for some extra logging with + * stack traces to be included, stubbing out other methods + * where needed to avoid failures. + * + * The logging is useful for tracking + * down why there are extra calls to a method than a test would expect: + * changes in implementation details often trigger such false-positive + * test failures. + * + * This class is in the s3a package so that it has access to methods + */ +public class MockS3AFileSystem extends S3AFileSystem { + public static final String BUCKET = "bucket-name"; + public static final URI FS_URI = URI.create("s3a://" + BUCKET + "/"); + protected static final Logger LOG = + LoggerFactory.getLogger(MockS3AFileSystem.class); + + private final S3AFileSystem mock; + private final Pair outcome; + + /** Log nothing: {@value}. */ + public static final int LOG_NONE = 0; + + /** Log the name of the operation any arguments: {@value}. */ + public static final int LOG_NAME = 1; + + /** Log the entire stack of where operations are called: {@value}. */ + public static final int LOG_STACK = 2; + + /** + * This can be edited to set the log level of events through the + * mock FS. + */ + private int logEvents = LOG_NAME; + private final S3AInstrumentation instrumentation = + new S3AInstrumentation(FS_URI); + private Configuration conf; + + public MockS3AFileSystem(S3AFileSystem mock, + Pair outcome) { + this.mock = mock; + this.outcome = outcome; + setUri(FS_URI); + setBucket(BUCKET); + } + + public Pair + getOutcome() { + return outcome; + } + + public int getLogEvents() { + return logEvents; + } + + public void setLogEvents(int logEvents) { + this.logEvents = logEvents; + } + + private void event(String format, Object... args) { + Throwable ex = null; + String s = String.format(format, args); + switch (logEvents) { + case LOG_STACK: + ex = new Exception(s); + /* fall through */ + case LOG_NAME: + LOG.info(s, ex); + break; + case LOG_NONE: + default: + //nothing + } + } + + @Override + public Path getWorkingDirectory() { + return new Path("s3a://" + BUCKET + "/work"); + } + + @Override + public void initialize(URI name, Configuration originalConf) + throws IOException { + conf = originalConf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public boolean isMagicCommitEnabled() { + return true; + } + + /** + * Make operation to set the s3 client public. + * @param client client. + */ + @Override + public void setAmazonS3Client(AmazonS3 client) { + LOG.debug("Setting S3 client to {}", client); + super.setAmazonS3Client(client); + } + + @Override + public boolean exists(Path f) throws IOException { + event("exists(%s)", f); + return mock.exists(f); + } + + @Override + void finishedWrite(String key, long length) { + + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + event("open(%s)", f); + return mock.open(f, bufferSize); + } + + @Override + public FSDataOutputStream create(Path f, + FsPermission permission, + boolean overwrite, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + event("create(%s)", f); + return mock.create(f, permission, overwrite, bufferSize, replication, + blockSize, progress); + } + + @Override + public FSDataOutputStream append(Path f, + int bufferSize, + Progressable progress) throws IOException { + return mock.append(f, bufferSize, progress); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + event("rename(%s, %s)", src, dst); + return mock.rename(src, dst); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + event("delete(%s, %s)", f, recursive); + return mock.delete(f, recursive); + } + + @Override + public FileStatus[] listStatus(Path f) + throws IOException { + event("listStatus(%s)", f); + return mock.listStatus(f); + } + + @Override + public RemoteIterator listFiles(Path f, boolean recursive) + throws IOException { + event("listFiles(%s, %s)", f, recursive); + return new EmptyIterator(); + } + + @Override + public void setWorkingDirectory(Path newDir) { + mock.setWorkingDirectory(newDir); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + event("mkdirs(%s)", f); + return mock.mkdirs(f, permission); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + event("getFileStatus(%s)", f); + return mock.getFileStatus(f); + } + + @Override + public long getDefaultBlockSize(Path f) { + return mock.getDefaultBlockSize(f); + } + + @Override + protected void incrementStatistic(Statistic statistic) { + } + + @Override + protected void incrementStatistic(Statistic statistic, long count) { + } + + @Override + protected void incrementGauge(Statistic statistic, long count) { + } + + @Override + public void incrementReadOperations() { + } + + @Override + public void incrementWriteOperations() { + } + + @Override + public void incrementPutStartStatistics(long bytes) { + } + + @Override + public void incrementPutCompletedStatistics(boolean success, long bytes) { + } + + @Override + public void incrementPutProgressStatistics(String key, long bytes) { + } + + @Override + protected void setOptionalMultipartUploadRequestParameters( + InitiateMultipartUploadRequest req) { +// no-op + } + + @Override + @SuppressWarnings("deprecation") + public long getDefaultBlockSize() { + return mock.getDefaultBlockSize(); + } + + @Override + void deleteObjectAtPath(Path f, String key, boolean isFile) + throws AmazonClientException, IOException { + deleteObject(key); + } + + @Override + void maybeCreateFakeParentDirectory(Path path) + throws IOException, AmazonClientException { + // no-op + } + + private static class EmptyIterator implements + RemoteIterator { + @Override + public boolean hasNext() throws IOException { + return false; + } + + @Override + public LocatedFileStatus next() throws IOException { + return null; + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "MockS3AFileSystem{"); + sb.append("inner mockFS=").append(mock); + sb.append('}'); + return sb.toString(); + } + + @Override + public S3AInstrumentation.CommitterStatistics newCommitterStatistics() { + return instrumentation.newCommitterStatistics(); + } + + @Override + public void operationRetried(Exception ex) { + /** no-op */ + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java index 4e25380..b746bfe5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java @@ -21,8 +21,10 @@ package org.apache.hadoop.fs.s3a; import static org.mockito.Mockito.*; import java.net.URI; +import java.util.ArrayList; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.MultipartUploadListing; import com.amazonaws.services.s3.model.Region; /** @@ -36,6 +38,12 @@ public class MockS3ClientFactory implements S3ClientFactory { String bucket = name.getHost(); AmazonS3 s3 = mock(AmazonS3.class); when(s3.doesBucketExist(bucket)).thenReturn(true); + // this listing is used in startup if purging is enabled, so + // return a stub value + MultipartUploadListing noUploads = new MultipartUploadListing(); + noUploads.setMultipartUploads(new ArrayList<>(0)); + when(s3.listMultipartUploads(anyObject())) + .thenReturn(noUploads); when(s3.getBucketLocation(anyString())) .thenReturn(Region.US_West.toString()); return s3; http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java index 2c4f009..7f7802d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java @@ -163,4 +163,10 @@ public interface S3ATestConstants { */ String CONFIGURATION_TEST_ENDPOINT = "test.fs.s3a.endpoint"; + + /** + * Property to set to disable caching. + */ + String FS_S3A_IMPL_DISABLE_CACHE + = "fs.s3a.impl.disable.cache"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index b302e72..773c25a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -21,10 +21,13 @@ package org.apache.hadoop.fs.s3a; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.s3guard.DynamoDBClientFactory; import org.apache.hadoop.fs.s3a.s3guard.DynamoDBLocalClientFactory; import org.apache.hadoop.fs.s3a.s3guard.S3Guard; @@ -39,9 +42,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.List; +import java.util.stream.Collectors; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions; @@ -59,6 +66,7 @@ public final class S3ATestUtils { * a property has been unset. */ public static final String UNSET_PROPERTY = "unset"; + public static final int PURGE_DELAY_SECONDS = 60 * 60; /** * Get S3A FS name. @@ -120,15 +128,19 @@ public final class S3ATestUtils { S3AFileSystem fs1 = new S3AFileSystem(); //enable purging in tests if (purge) { - conf.setBoolean(PURGE_EXISTING_MULTIPART, true); - // but a long delay so that parallel multipart tests don't + // purge with but a delay so that parallel multipart tests don't // suddenly start timing out - conf.setInt(PURGE_EXISTING_MULTIPART_AGE, 30 * 60); + enableMultipartPurge(conf, PURGE_DELAY_SECONDS); } fs1.initialize(testURI, conf); return fs1; } + public static void enableMultipartPurge(Configuration conf, int seconds) { + conf.setBoolean(PURGE_EXISTING_MULTIPART, true); + conf.setInt(PURGE_EXISTING_MULTIPART_AGE, seconds); + } + /** * Create a file context for tests. * @@ -287,13 +299,13 @@ public final class S3ATestUtils { * @return the exception, if it is of the expected class * @throws Exception the exception passed in. */ - public static Exception verifyExceptionClass(Class clazz, + public static E verifyExceptionClass(Class clazz, Exception ex) throws Exception { if (!(ex.getClass().equals(clazz))) { throw ex; } - return ex; + return (E)ex; } /** @@ -302,7 +314,7 @@ public final class S3ATestUtils { * @param conf configuration to patch */ public static void disableFilesystemCaching(Configuration conf) { - conf.setBoolean("fs.s3a.impl.disable.cache", true); + conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, true); } /** @@ -732,4 +744,112 @@ public final class S3ATestUtils { = (S3ABlockOutputStream) out.getWrappedStream(); return blockOutputStream.getStatistics(); } + + /** + * Read in a file and convert to an ascii string. + * @param fs filesystem + * @param path path to read + * @return the bytes read and converted to a string + * @throws IOException IO problems + */ + public static String read(FileSystem fs, + Path path) throws IOException { + FileStatus status = fs.getFileStatus(path); + try (FSDataInputStream in = fs.open(path)) { + byte[] buf = new byte[(int)status.getLen()]; + in.readFully(0, buf); + return new String(buf); + } + } + + /** + * List a directory. + * @param fileSystem FS + * @param path path + * @throws IOException failure. + */ + public static void lsR(FileSystem fileSystem, Path path, boolean recursive) + throws Exception { + if (path == null) { + // surfaces when someone calls getParent() on something at the top + // of the path + LOG.info("Empty path"); + return; + } + S3AUtils.applyLocatedFiles(fileSystem.listFiles(path, recursive), + (status) -> LOG.info(" {}", status)); + } + + /** + * Turn on the inconsistent S3A FS client in a configuration, + * with 100% probability of inconsistency, default delays. + * For this to go live, the paths must include the element + * {@link InconsistentAmazonS3Client#DEFAULT_DELAY_KEY_SUBSTRING}. + * @param conf configuration to patch + * @param delay delay in millis + */ + public static void enableInconsistentS3Client(Configuration conf, + long delay) { + LOG.info("Enabling inconsistent S3 client"); + conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class, + S3ClientFactory.class); + conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING); + conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, delay); + conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 0.0f); + conf.setFloat(FAIL_INJECT_THROTTLE_PROBABILITY, 0.0f); + } + + /** + * Is the filesystem using the inconsistent/throttling/unreliable client? + * @param fs filesystem + * @return true if the filesystem's client is the inconsistent one. + */ + public static boolean isFaultInjecting(S3AFileSystem fs) { + return fs.getAmazonS3Client() instanceof InconsistentAmazonS3Client; + } + + /** + * Skip a test because the client is using fault injection. + * This should only be done for those tests which are measuring the cost + * of operations or otherwise cannot handle retries. + * @param fs filesystem to check + */ + public static void skipDuringFaultInjection(S3AFileSystem fs) { + Assume.assumeFalse("Skipping as filesystem has fault injection", + isFaultInjecting(fs)); + } + + /** + * Date format used for mapping upload initiation time to human string. + */ + private static final DateFormat LISTING_FORMAT = new SimpleDateFormat( + "yyyy-MM-dd HH:mm:ss"); + + /** + * Get a list of all pending uploads under a prefix, one which can be printed. + * @param prefix prefix to look under + * @return possibly empty list + * @throws IOException IO failure. + */ + public static List listMultipartUploads(S3AFileSystem fs, + String prefix) throws IOException { + + return fs + .listMultipartUploads(prefix).stream() + .map(upload -> String.format("Upload to %s with ID %s; initiated %s", + upload.getKey(), + upload.getUploadId(), + LISTING_FORMAT.format(upload.getInitiated()))) + .collect(Collectors.toList()); + } + + /** + * Skip a test if the FS isn't marked as supporting magic commits. + * @param fs filesystem + */ + public void assumeMagicCommitEnabled(S3AFileSystem fs) { + assume("Magic commit option disabled on " + fs, + fs.hasCapability(CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER)); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/StorageStatisticsTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/StorageStatisticsTracker.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/StorageStatisticsTracker.java new file mode 100644 index 0000000..7d17d69 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/StorageStatisticsTracker.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import com.google.common.base.Joiner; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.StorageStatistics; + +/** + * Class to track storage statistics of a filesystem, generate diffs. + */ +public class StorageStatisticsTracker { + + private final FileSystem fs; + private Map stats; + + public StorageStatisticsTracker(FileSystem fs) { + this.fs = fs; + snapshot(); + } + + public void mark() { + stats = snapshot(); + } + + public Map compare(Map current) { + Map diff = new HashMap<>(stats.size()); + for (Map.Entry entry : stats.entrySet()) { + String key = entry.getKey(); + Long latest = current.get(key); + if (latest != null && !latest.equals(entry.getValue())) { + diff.put(key, entry.getValue() - latest); + } + } + return diff; + } + + public Map compareToCurrent() { + return compare(snapshot()); + } + + public String toString(Map map) { + return Joiner.on("\n").withKeyValueSeparator("=").join(map); + } + + public Map snapshot() { + StatsIterator values = latestValues(); + Map snapshot = new HashMap<>( + stats == null ? 0 : stats.size()); + for (StorageStatistics.LongStatistic value : values) { + snapshot.put(value.getName(), value.getValue()); + } + return snapshot; + } + + public StatsIterator latestValues() { + return new StatsIterator(fs.getStorageStatistics()); + } + + /** + * Provide an iterator to the stats. + */ + public static class StatsIterator implements + Iterable { + private final StorageStatistics statistics; + + public StatsIterator(StorageStatistics statistics) { + this.statistics = statistics; + } + + @Override + public Iterator iterator() { + return statistics.getLongStatistics(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInvoker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInvoker.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInvoker.java new file mode 100644 index 0000000..d29e2df --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInvoker.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkBaseException; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.ConnectTimeoutException; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.Invoker.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.verifyExceptionClass; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.test.LambdaTestUtils.*; + +/** + * Test the {@link Invoker} code and the associated {@link S3ARetryPolicy}. + * + * Some of the tests look at how Connection Timeout Exceptions are processed. + * Because of how the AWS libraries shade the classes, there have been some + * regressions here during development. These tests are intended to verify that + * the current match process based on classname works. + */ +@SuppressWarnings("ThrowableNotThrown") +public class TestInvoker extends Assert { + + /** Configuration to use for short retry intervals. */ + private static final Configuration FAST_RETRY_CONF; + private static final ConnectTimeoutException + HADOOP_CONNECTION_TIMEOUT_EX = new ConnectTimeoutException("hadoop"); + private static final Local.ConnectTimeoutException + LOCAL_CONNECTION_TIMEOUT_EX + = new Local.ConnectTimeoutException("local"); + private static final org.apache.http.conn.ConnectTimeoutException + HTTP_CONNECTION_TIMEOUT_EX + = new org.apache.http.conn.ConnectTimeoutException("apache"); + private static final SocketTimeoutException SOCKET_TIMEOUT_EX + = new SocketTimeoutException("socket"); + + /** + * What retry limit to use. + */ + private static final int ACTIVE_RETRY_LIMIT = RETRY_LIMIT_DEFAULT; + + /** + * A retry count guaranteed to be out of range. + */ + private static final int RETRIES_TOO_MANY = ACTIVE_RETRY_LIMIT + 10; + + /** + * A count of retry attempts guaranteed to be within the permitted range. + */ + public static final int SAFE_RETRY_COUNT = 5; + + static { + FAST_RETRY_CONF = new Configuration(); + String interval = "10ms"; + FAST_RETRY_CONF.set(RETRY_INTERVAL, interval); + FAST_RETRY_CONF.set(RETRY_THROTTLE_INTERVAL, interval); + FAST_RETRY_CONF.setInt(RETRY_LIMIT, ACTIVE_RETRY_LIMIT); + FAST_RETRY_CONF.setInt(RETRY_THROTTLE_LIMIT, ACTIVE_RETRY_LIMIT); + } + + private static final S3ARetryPolicy RETRY_POLICY = + new S3ARetryPolicy(FAST_RETRY_CONF); + + private int retryCount; + private Invoker invoker = new Invoker(RETRY_POLICY, + (text, e, retries, idempotent) -> retryCount++); + private static final AmazonClientException CLIENT_TIMEOUT_EXCEPTION = + new AmazonClientException(new Local.ConnectTimeoutException("timeout")); + private static final AmazonServiceException BAD_REQUEST = serviceException( + AWSBadRequestException.STATUS_CODE, + "bad request"); + + @Before + public void setup() { + resetCounters(); + } + + private static AmazonServiceException serviceException(int code, + String text) { + AmazonServiceException ex = new AmazonServiceException(text); + ex.setStatusCode(code); + return ex; + } + + private static AmazonS3Exception createS3Exception(int code) { + return createS3Exception(code, "", null); + } + + private static AmazonS3Exception createS3Exception(int code, + String message, + Throwable inner) { + AmazonS3Exception ex = new AmazonS3Exception(message); + ex.setStatusCode(code); + ex.initCause(inner); + return ex; + } + + protected void verifyTranslated( + int status, + Class expected) throws Exception { + verifyTranslated(expected, createS3Exception(status)); + } + + private static E verifyTranslated(Class clazz, + SdkBaseException exception) throws Exception { + return verifyExceptionClass(clazz, + translateException("test", "/", exception)); + } + + private void resetCounters() { + retryCount = 0; + } + + @Test + public void test503isThrottled() throws Exception { + verifyTranslated(503, AWSServiceThrottledException.class); + } + + @Test + public void testS3500isStatus500Exception() throws Exception { + verifyTranslated(500, AWSStatus500Exception.class); + } + + @Test + public void test500isStatus500Exception() throws Exception { + AmazonServiceException ex = new AmazonServiceException(""); + ex.setStatusCode(500); + verifyTranslated(AWSStatus500Exception.class, + ex); + } + + @Test(expected = org.apache.hadoop.net.ConnectTimeoutException.class) + public void testExtractConnectTimeoutException() throws Throwable { + throw extractException("", "", + new ExecutionException( + new AmazonClientException(LOCAL_CONNECTION_TIMEOUT_EX))); + } + + @Test(expected = SocketTimeoutException.class) + public void testExtractSocketTimeoutException() throws Throwable { + throw extractException("", "", + new ExecutionException( + new AmazonClientException(SOCKET_TIMEOUT_EX))); + } + + /** + * Make an assertion about a retry policy. + * @param text text for error message + * @param policy policy to check against + * @param expected expected action + * @param ex exception to analyze + * @param retries number of times there has already been a retry + * @param idempotent whether or not the operation is idempotent + * @throws Exception if the retry policy raises it + * @throws AssertionError if the returned action was not that expected. + */ + private void assertRetryAction(String text, + S3ARetryPolicy policy, + RetryPolicy.RetryAction expected, + Exception ex, + int retries, + boolean idempotent) throws Exception { + RetryPolicy.RetryAction outcome = policy.shouldRetry(ex, retries, 0, + idempotent); + if (!expected.action.equals(outcome.action)) { + throw new AssertionError( + String.format( + "%s Expected action %s from shouldRetry(%s, %s, %s), but got" + + " %s", + text, + expected, ex.toString(), retries, idempotent, + outcome.action), + ex); + } + } + + @Test + public void testRetryThrottled() throws Throwable { + S3ARetryPolicy policy = RETRY_POLICY; + IOException ex = translateException("GET", "/", newThrottledException()); + + assertRetryAction("Expected retry on first throttle", + policy, RetryPolicy.RetryAction.RETRY, + ex, 0, true); + int retries = SAFE_RETRY_COUNT; + assertRetryAction("Expected retry on repeated throttle", + policy, RetryPolicy.RetryAction.RETRY, + ex, retries, true); + assertRetryAction("Expected retry on non-idempotent throttle", + policy, RetryPolicy.RetryAction.RETRY, + ex, retries, false); + } + + @Test + public void testRetryThrottledDDB() throws Throwable { + assertRetryAction("Expected retry on connection timeout", + RETRY_POLICY, RetryPolicy.RetryAction.RETRY, + new ProvisionedThroughputExceededException("IOPs"), 1, false); + } + + + protected AmazonServiceException newThrottledException() { + return serviceException( + AWSServiceThrottledException.STATUS_CODE, "throttled"); + } + + /** + * Repeatedly retry until a throttle eventually stops being raised. + */ + @Test + public void testRetryOnThrottle() throws Throwable { + + final AtomicInteger counter = new AtomicInteger(0); + invoker.retry("test", null, false, + () -> { + if (counter.incrementAndGet() < 5) { + throw newThrottledException(); + } + }); + } + + /** + * Non-idempotent operations fail on anything which isn't a throttle + * or connectivity problem. + */ + @Test(expected = AWSBadRequestException.class) + public void testNoRetryOfBadRequestNonIdempotent() throws Throwable { + invoker.retry("test", null, false, + () -> { + throw serviceException(400, "bad request"); + }); + } + + /** + * AWS nested socket problems. + */ + @Test + public void testRetryAWSConnectivity() throws Throwable { + final AtomicInteger counter = new AtomicInteger(0); + invoker.retry("test", null, false, + () -> { + if (counter.incrementAndGet() < ACTIVE_RETRY_LIMIT) { + throw CLIENT_TIMEOUT_EXCEPTION; + } + }); + assertEquals(ACTIVE_RETRY_LIMIT, counter.get()); + } + + /** + * Repeatedly retry until eventually a bad request succeeds. + */ + @Test + public void testRetryBadRequestIdempotent() throws Throwable { + final AtomicInteger counter = new AtomicInteger(0); + final int attemptsBeforeSuccess = ACTIVE_RETRY_LIMIT; + invoker.retry("test", null, true, + () -> { + if (counter.incrementAndGet() < attemptsBeforeSuccess) { + throw BAD_REQUEST; + } + }); + assertEquals(attemptsBeforeSuccess, counter.get()); + assertEquals("retry count ", attemptsBeforeSuccess - 1, retryCount); + } + + @Test + public void testConnectionRetryPolicyIdempotent() throws Throwable { + assertRetryAction("Expected retry on connection timeout", + RETRY_POLICY, RetryPolicy.RetryAction.RETRY, + HADOOP_CONNECTION_TIMEOUT_EX, 1, true); + assertRetryAction("Expected connection timeout failure", + RETRY_POLICY, RetryPolicy.RetryAction.FAIL, + HADOOP_CONNECTION_TIMEOUT_EX, RETRIES_TOO_MANY, true); + } + + /** + * Even on a non-idempotent call, connection failures are considered + * retryable. + */ + @Test + public void testConnectionRetryPolicyNonIdempotent() throws Throwable { + assertRetryAction("Expected retry on connection timeout", + RETRY_POLICY, RetryPolicy.RetryAction.RETRY, + HADOOP_CONNECTION_TIMEOUT_EX, 1, false); + } + + /** + * Interrupted IOEs are not retryable. + */ + @Test + public void testInterruptedIOExceptionRetry() throws Throwable { + assertRetryAction("Expected retry on connection timeout", + RETRY_POLICY, RetryPolicy.RetryAction.FAIL, + new InterruptedIOException("interrupted"), 1, false); + } + + @Test + public void testUnshadedConnectionTimeoutExceptionMatching() + throws Throwable { + // connection timeout exceptions are special, but as AWS shades + // theirs, we need to string match them + verifyTranslated(ConnectTimeoutException.class, + new AmazonClientException(HTTP_CONNECTION_TIMEOUT_EX)); + } + + @Test + public void testShadedConnectionTimeoutExceptionMatching() throws Throwable { + // connection timeout exceptions are special, but as AWS shades + // theirs, we need to string match them + verifyTranslated(ConnectTimeoutException.class, + new AmazonClientException(LOCAL_CONNECTION_TIMEOUT_EX)); + } + + @Test + public void testShadedConnectionTimeoutExceptionNotMatching() + throws Throwable { + InterruptedIOException ex = verifyTranslated(InterruptedIOException.class, + new AmazonClientException(new Local.NotAConnectTimeoutException())); + if (ex instanceof ConnectTimeoutException) { + throw ex; + } + } + + /** + * Test that NPEs aren't retried. Also verify that the + * catch counts are incremented, while the retry count isn't. + */ + @Test + public void testNPEsNotRetried() throws Throwable { + assertRetryAction("Expected NPE trigger failure", + RETRY_POLICY, RetryPolicy.RetryAction.FAIL, + new NullPointerException("oops"), 1, true); + // catch notification didn't see it + assertEquals("retry count ", 0, retryCount); + } + + /** + * Container for the local exceptions, to help keep visible which + * specific class of exception. + */ + private static class Local { + + /** + * A local exception with a name to match the expected one. + */ + private static class ConnectTimeoutException + extends InterruptedIOException { + ConnectTimeoutException(String s) { + super(s); + } + } + + /** + * A local exception whose name should not match. + */ + private static class NotAConnectTimeoutException + extends InterruptedIOException { + + } + + } + + @Test + public void testQuietlyVoid() { + quietlyEval("", "", + () -> { + throw HADOOP_CONNECTION_TIMEOUT_EX; + }); + } + + @Test + public void testQuietlyEvalReturnValueSuccess() { + assertOptionalEquals("quietly", 3, + quietlyEval("", "", () -> 3)); + } + + @Test + public void testQuietlyEvalReturnValueFail() { + // use a variable so IDEs don't warn of numeric overflows + int d = 0; + assertOptionalUnset("quietly", + quietlyEval("", "", () -> 3 / d)); + } + + @Test + public void testDynamoDBThrottleConversion() throws Throwable { + ProvisionedThroughputExceededException exceededException = + new ProvisionedThroughputExceededException("iops"); + AWSServiceThrottledException ddb = verifyTranslated( + AWSServiceThrottledException.class, exceededException); + assertTrue(isThrottleException(exceededException)); + assertTrue(isThrottleException(ddb)); + assertRetryAction("Expected throttling retry", + RETRY_POLICY, + RetryPolicy.RetryAction.RETRY, + ddb, SAFE_RETRY_COUNT, false); + // and briefly attempt an operation + CatchCallback catcher = new CatchCallback(); + AtomicBoolean invoked = new AtomicBoolean(false); + invoker.retry("test", null, false, catcher, + () -> { + if (!invoked.getAndSet(true)) { + throw exceededException; + } + }); + // to verify that the ex was translated by the time it + // got to the callback + verifyExceptionClass(AWSServiceThrottledException.class, + catcher.lastException); + } + + /** + * Catch the exception and preserve it for later queries. + */ + private static final class CatchCallback implements Retried { + private IOException lastException; + @Override + public void onFailure(String text, + IOException exception, + int retries, + boolean idempotent) { + lastException = exception; + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java index e647327..39a5e3b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java @@ -32,7 +32,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; -import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL; +import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL; import static org.apache.hadoop.fs.s3a.Listing.ProvidedFileStatusIterator; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java index e548ac2..9b86595 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java @@ -27,6 +27,7 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.SocketTimeoutException; import java.nio.file.AccessDeniedException; import java.util.Collections; import java.util.Map; @@ -38,58 +39,87 @@ import com.amazonaws.services.s3.model.AmazonS3Exception; import org.junit.Test; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; + /** - * Unit test suite covering translation of AWS SDK exceptions to S3A exceptions. + * Unit test suite covering translation of AWS SDK exceptions to S3A exceptions, + * and retry/recovery policies. */ +@SuppressWarnings("ThrowableNotThrown") public class TestS3AExceptionTranslation { + private static final org.apache.http.conn.ConnectTimeoutException + HTTP_CONNECTION_TIMEOUT_EX + = new org.apache.http.conn.ConnectTimeoutException("apache"); + private static final SocketTimeoutException SOCKET_TIMEOUT_EX + = new SocketTimeoutException("socket"); + @Test public void test301ContainsEndpoint() throws Exception { - AmazonS3Exception s3Exception = createS3Exception("wrong endpoint", 301, + String bucket = "bucket.s3-us-west-2.amazonaws.com"; + int sc301 = 301; + AmazonS3Exception s3Exception = createS3Exception("wrong endpoint", sc301, Collections.singletonMap(S3AUtils.ENDPOINT_KEY, - "bucket.s3-us-west-2.amazonaws.com")); - AWSS3IOException ex = (AWSS3IOException)verifyTranslated( - AWSS3IOException.class, s3Exception); - assertEquals(301, ex.getStatusCode()); + bucket)); + AWSRedirectException ex = verifyTranslated( + AWSRedirectException.class, s3Exception); + assertStatusCode(sc301, ex); assertNotNull(ex.getMessage()); - assertTrue(ex.getMessage().contains("bucket.s3-us-west-2.amazonaws.com")); - assertTrue(ex.getMessage().contains(ENDPOINT)); + + assertContained(ex.getMessage(), bucket); + assertContained(ex.getMessage(), ENDPOINT); + assertExceptionContains(ENDPOINT, ex, "endpoint"); + assertExceptionContains(bucket, ex, "bucket name"); + } + + protected void assertContained(String text, String contained) { + assertTrue("string \""+ contained + "\" not found in \"" + text + "\"", + text != null && text.contains(contained)); + } + + protected void verifyTranslated( + int status, + Class expected) throws Exception { + verifyTranslated(expected, createS3Exception(status)); + } + + @Test + public void test400isBad() throws Exception { + verifyTranslated(400, AWSBadRequestException.class); } @Test public void test401isNotPermittedFound() throws Exception { - verifyTranslated(AccessDeniedException.class, - createS3Exception(401)); + verifyTranslated(401, AccessDeniedException.class); } @Test public void test403isNotPermittedFound() throws Exception { - verifyTranslated(AccessDeniedException.class, - createS3Exception(403)); + verifyTranslated(403, AccessDeniedException.class); } @Test public void test404isNotFound() throws Exception { - verifyTranslated(FileNotFoundException.class, createS3Exception(404)); + verifyTranslated(404, FileNotFoundException.class); } @Test public void test410isNotFound() throws Exception { - verifyTranslated(FileNotFoundException.class, createS3Exception(410)); + verifyTranslated(410, FileNotFoundException.class); } @Test public void test416isEOF() throws Exception { - verifyTranslated(EOFException.class, createS3Exception(416)); + verifyTranslated(416, EOFException.class); } @Test public void testGenericS3Exception() throws Exception { // S3 exception of no known type - AWSS3IOException ex = (AWSS3IOException)verifyTranslated( + AWSS3IOException ex = verifyTranslated( AWSS3IOException.class, createS3Exception(451)); - assertEquals(451, ex.getStatusCode()); + assertStatusCode(451, ex); } @Test @@ -97,10 +127,19 @@ public class TestS3AExceptionTranslation { // service exception of no known type AmazonServiceException ase = new AmazonServiceException("unwind"); ase.setStatusCode(500); - AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated( - AWSServiceIOException.class, + AWSServiceIOException ex = verifyTranslated( + AWSStatus500Exception.class, ase); - assertEquals(500, ex.getStatusCode()); + assertStatusCode(500, ex); + } + + protected void assertStatusCode(int expected, AWSServiceIOException ex) { + assertNotNull("Null exception", ex); + if (expected != ex.getStatusCode()) { + throw new AssertionError("Expected status code " + expected + + "but got " + ex.getStatusCode(), + ex); + } } @Test @@ -122,7 +161,7 @@ public class TestS3AExceptionTranslation { return source; } - private static Exception verifyTranslated(Class clazz, + private static E verifyTranslated(Class clazz, AmazonClientException exception) throws Exception { return verifyExceptionClass(clazz, translateException("test", "/", exception)); @@ -130,7 +169,8 @@ public class TestS3AExceptionTranslation { private void assertContainsInterrupted(boolean expected, Throwable thrown) throws Throwable { - if (containsInterruptedException(thrown) != expected) { + boolean wasInterrupted = containsInterruptedException(thrown) != null; + if (wasInterrupted != expected) { throw thrown; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java new file mode 100644 index 0000000..267d4df --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.stream.Collectors; + +import com.amazonaws.services.s3.AmazonS3; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; + +/** + * Base test suite for committer operations. + * + * By default, these tests enable the inconsistent committer, with + * a delay of {@link #CONSISTENCY_DELAY}; they may also have throttling + * enabled/disabled. + * + * Important: all filesystem probes will have to wait for + * the FS inconsistency delays and handle things like throttle exceptions, + * or disable throttling and fault injection before the probe. + * + */ +public abstract class AbstractCommitITest extends AbstractS3ATestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractCommitITest.class); + + protected static final int CONSISTENCY_DELAY = 500; + protected static final int CONSISTENCY_PROBE_INTERVAL = 500; + protected static final int CONSISTENCY_WAIT = CONSISTENCY_DELAY * 2; + + private InconsistentAmazonS3Client inconsistentClient; + + /** + * Should the inconsistent S3A client be used? + * Default value: true. + * @return true for inconsistent listing + */ + public boolean useInconsistentClient() { + return true; + } + + /** + * switch to an inconsistent path if in inconsistent mode. + * {@inheritDoc} + */ + @Override + protected Path path(String filepath) throws IOException { + return useInconsistentClient() ? + super.path(InconsistentAmazonS3Client.DEFAULT_DELAY_KEY_SUBSTRING + + "/" + filepath) + : super.path(filepath); + } + + /** + * Creates a configuration for commit operations: commit is enabled in the FS + * and output is multipart to on-heap arrays. + * @return a configuration to use when creating an FS. + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY); + if (useInconsistentClient()) { + enableInconsistentS3Client(conf, CONSISTENCY_DELAY); + } + return conf; + } + + /** + * Get the log; can be overridden for test case log. + * @return a log. + */ + public Logger log() { + return LOG; + } + + /*** + * Bind to the named committer. + * + * @param conf configuration + * @param factory factory name + * @param committerName committer; set if non null/empty + */ + protected void bindCommitter(Configuration conf, String factory, + String committerName) { + conf.set(S3A_COMMITTER_FACTORY_KEY, factory); + if (StringUtils.isNotEmpty(committerName)) { + conf.set(FS_S3A_COMMITTER_NAME, committerName); + } + } + + /** + * Clean up a directory. + * Waits for consistency if needed + * @param dir directory + * @param conf configuration + * @throws IOException failure + */ + public void rmdir(Path dir, Configuration conf) throws IOException { + if (dir != null) { + describe("deleting %s", dir); + FileSystem fs = dir.getFileSystem(conf); + waitForConsistency(); + fs.delete(dir, true); + waitForConsistency(); + } + } + + /** + * Setup will use inconsistent client if {@link #useInconsistentClient()} + * is true. + * @throws Exception failure. + */ + @Override + public void setup() throws Exception { + super.setup(); + if (useInconsistentClient()) { + AmazonS3 client = getFileSystem() + .getAmazonS3ClientForTesting("fault injection"); + Assert.assertTrue( + "AWS client is not inconsistent, even though the test requirees it " + + client, + client instanceof InconsistentAmazonS3Client); + inconsistentClient = (InconsistentAmazonS3Client) client; + } + } + + /** + * Teardown waits for the consistency delay and resets failure count, so + * FS is stable, before the superclass teardown is called. This + * should clean things up better. + * @throws Exception failure. + */ + @Override + public void teardown() throws Exception { + waitForConsistency(); + // make sure there are no failures any more + resetFailures(); + super.teardown(); + } + + /** + * Wait a multiple of the inconsistency delay for things to stabilize; + * no-op if the consistent client is used. + * @throws InterruptedIOException if the sleep is interrupted + */ + protected void waitForConsistency() throws InterruptedIOException { + if (useInconsistentClient() && inconsistentClient != null) { + try { + Thread.sleep(2* inconsistentClient.getDelayKeyMsec()); + } catch (InterruptedException e) { + throw (InterruptedIOException) + (new InterruptedIOException("while waiting for consistency: " + e) + .initCause(e)); + } + } + } + + /** + * Set the throttling factor on requests. + * @param p probability of a throttling occurring: 0-1.0 + */ + protected void setThrottling(float p) { + inconsistentClient.setThrottleProbability(p); + } + + /** + * Set the throttling factor on requests and number of calls to throttle. + * @param p probability of a throttling occurring: 0-1.0 + * @param limit limit to number of calls which fail + */ + protected void setThrottling(float p, int limit) { + inconsistentClient.setThrottleProbability(p); + setFailureLimit(limit); + } + + /** + * Turn off throttling. + */ + protected void resetFailures() { + if (inconsistentClient != null) { + setThrottling(0, 0); + } + } + + /** + * Set failure limit. + * @param limit limit to number of calls which fail + */ + private void setFailureLimit(int limit) { + inconsistentClient.setFailureLimit(limit); + } + + /** + * Abort all multipart uploads under a path. + * @param path path for uploads to abort; may be null + * @return a count of aborts + * @throws IOException trouble. + */ + protected int abortMultipartUploadsUnderPath(Path path) throws IOException { + S3AFileSystem fs = getFileSystem(); + if (fs != null && path != null) { + String key = fs.pathToKey(path); + WriteOperationHelper writeOps = fs.createWriteOperationHelper(); + int count = writeOps.abortMultipartUploadsUnderPath(key); + if (count > 0) { + log().info("Multipart uploads deleted: {}", count); + } + return count; + } else { + return 0; + } + } + + /** + * Assert that there *are* pending MPUs. + * @param path path to look under + * @throws IOException IO failure + */ + protected void assertMultipartUploadsPending(Path path) throws IOException { + assertTrue("No multipart uploads in progress under " + path, + countMultipartUploads(path) > 0); + } + + /** + * Assert that there *are no* pending MPUs; assertion failure will include + * the list of pending writes. + * @param path path to look under + * @throws IOException IO failure + */ + protected void assertNoMultipartUploadsPending(Path path) throws IOException { + List uploads = listMultipartUploads(getFileSystem(), + pathToPrefix(path)); + if (!uploads.isEmpty()) { + String result = uploads.stream().collect(Collectors.joining("\n")); + fail("Multipart uploads in progress under " + path + " \n" + result); + } + } + + /** + * Count the number of MPUs under a path. + * @param path path to scan + * @return count + * @throws IOException IO failure + */ + protected int countMultipartUploads(Path path) throws IOException { + return countMultipartUploads(pathToPrefix(path)); + } + + /** + * Count the number of MPUs under a prefix; also logs them. + * @param prefix prefix to scan + * @return count + * @throws IOException IO failure + */ + protected int countMultipartUploads(String prefix) throws IOException { + return listMultipartUploads(getFileSystem(), prefix).size(); + } + + /** + * Map from a path to a prefix. + * @param path path + * @return the key + */ + private String pathToPrefix(Path path) { + return path == null ? "" : + getFileSystem().pathToKey(path); + } + + /** + * Verify that the specified dir has the {@code _SUCCESS} marker + * and that it can be loaded. + * The contents will be logged and returned. + * @param dir directory to scan + * @return the loaded success data + * @throws IOException IO Failure + */ + protected SuccessData verifySuccessMarker(Path dir) throws IOException { + assertPathExists("Success marker", + new Path(dir, _SUCCESS)); + SuccessData successData = loadSuccessMarker(dir); + log().info("Success data {}", successData.toString()); + log().info("Metrics\n{}", + successData.dumpMetrics(" ", " = ", "\n")); + log().info("Diagnostics\n{}", + successData.dumpDiagnostics(" ", " = ", "\n")); + return successData; + } + + /** + * Load the success marker and return the data inside it. + * @param dir directory containing the marker + * @return the loaded data + * @throws IOException on any failure to load or validate the data + */ + protected SuccessData loadSuccessMarker(Path dir) throws IOException { + return SuccessData.load(getFileSystem(), new Path(dir, _SUCCESS)); + } + + /** + * Read a UTF-8 file. + * @param path path to read + * @return string value + * @throws IOException IO failure + */ + protected String readFile(Path path) throws IOException { + return ContractTestUtils.readUTF8(getFileSystem(), path, -1); + } + + /** + * Assert that the given dir does not have the {@code _SUCCESS} marker. + * @param dir dir to scan + * @throws IOException IO Failure + */ + protected void assertSuccessMarkerDoesNotExist(Path dir) throws IOException { + assertPathDoesNotExist("Success marker", + new Path(dir, _SUCCESS)); + } + + /** + * Closeable which can be used to safely close writers in + * a try-with-resources block.. + */ + protected static class CloseWriter implements AutoCloseable{ + + private final RecordWriter writer; + private final TaskAttemptContext context; + + public CloseWriter(RecordWriter writer, + TaskAttemptContext context) { + this.writer = writer; + this.context = context; + } + + @Override + public void close() { + try { + writer.close(context); + } catch (IOException | InterruptedException e) { + LOG.error("When closing {} on context {}", + writer, context, e); + + } + } + } + + /** + * Create a task attempt for a Job. This is based on the code + * run in the MR AM, creating a task (0) for the job, then a task + * attempt (0). + * @param jobId job ID + * @param jContext job context + * @return the task attempt. + */ + public static TaskAttemptContext taskAttemptForJob(JobId jobId, + JobContext jContext) { + org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = + MRBuilderUtils.newTaskId(jobId, 0, + org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP); + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = + MRBuilderUtils.newTaskAttemptId(taskID, 0); + return new TaskAttemptContextImpl( + jContext.getConfiguration(), + TypeConverter.fromYarn(attemptID)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java new file mode 100644 index 0000000..13dfd83 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.Sets; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AUtils; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.service.ServiceOperations; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*; + +/** Full integration test of an MR job. */ +public abstract class AbstractITCommitMRJob extends AbstractCommitITest { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractITCommitMRJob.class); + + private static final int TEST_FILE_COUNT = 2; + private static final int SCALE_TEST_FILE_COUNT = 20; + + private static MiniDFSClusterService hdfs; + private static MiniMRYarnCluster yarn = null; + private static JobConf conf = null; + private boolean uniqueFilenames = false; + private boolean scaleTest; + + protected static FileSystem getDFS() { + return hdfs.getClusterFS(); + } + + @BeforeClass + public static void setupClusters() throws IOException { + // the HDFS and YARN clusters share the same configuration, so + // the HDFS cluster binding is implicitly propagated to YARN + conf = new JobConf(); + conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false); + conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE); + + hdfs = new MiniDFSClusterService(); + hdfs.init(conf); + hdfs.start(); + yarn = new MiniMRYarnCluster("ITCommitMRJob", 2); + yarn.init(conf); + yarn.start(); + } + + @SuppressWarnings("ThrowableNotThrown") + @AfterClass + public static void teardownClusters() throws IOException { + conf = null; + ServiceOperations.stopQuietly(yarn); + ServiceOperations.stopQuietly(hdfs); + hdfs = null; + yarn = null; + } + + public static MiniDFSCluster getHdfs() { + return hdfs.getCluster(); + } + + public static FileSystem getLocalFS() { + return hdfs.getLocalFS(); + } + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + /** + * The name of the committer as returned by + * {@link AbstractS3ACommitter#getName()} and used for committer construction. + */ + protected abstract String committerName(); + + @Override + public void setup() throws Exception { + super.setup(); + scaleTest = getTestPropertyBool( + getConfiguration(), + KEY_SCALE_TESTS_ENABLED, + DEFAULT_SCALE_TESTS_ENABLED); + } + + @Override + protected int getTestTimeoutMillis() { + return SCALE_TEST_TIMEOUT_SECONDS * 1000; + } + + @Test + public void testMRJob() throws Exception { + S3AFileSystem fs = getFileSystem(); + // final dest is in S3A + Path outputPath = path("testMRJob"); + + String commitUUID = UUID.randomUUID().toString(); + String suffix = uniqueFilenames ? ("-" + commitUUID) : ""; + int numFiles = getTestFileCount(); + List expectedFiles = new ArrayList<>(numFiles); + Set expectedKeys = Sets.newHashSet(); + for (int i = 0; i < numFiles; i += 1) { + File file = temp.newFile(String.valueOf(i) + ".text"); + try (FileOutputStream out = new FileOutputStream(file)) { + out.write(("file " + i).getBytes(StandardCharsets.UTF_8)); + } + String filename = String.format("part-m-%05d%s", i, suffix); + Path path = new Path(outputPath, filename); + expectedFiles.add(path.toString()); + expectedKeys.add("/" + fs.pathToKey(path)); + } + Collections.sort(expectedFiles); + + Job mrJob = Job.getInstance(yarn.getConfig(), "test-committer-job"); + JobConf jobConf = (JobConf) mrJob.getConfiguration(); + jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + uniqueFilenames); + + + bindCommitter(jobConf, + CommitConstants.S3A_COMMITTER_FACTORY, + committerName()); + // pass down the scale test flag + jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest); + + mrJob.setOutputFormatClass(LoggingTextOutputFormat.class); + FileOutputFormat.setOutputPath(mrJob, outputPath); + + File mockResultsFile = temp.newFile("committer.bin"); + mockResultsFile.delete(); + String committerPath = "file:" + mockResultsFile; + jobConf.set("mock-results-file", committerPath); + jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID); + + mrJob.setInputFormatClass(TextInputFormat.class); + FileInputFormat.addInputPath(mrJob, new Path(temp.getRoot().toURI())); + + mrJob.setMapperClass(MapClass.class); + mrJob.setNumReduceTasks(0); + + // an attempt to set up log4j properly, which clearly doesn't work + URL log4j = getClass().getClassLoader().getResource("log4j.properties"); + if (log4j != null && log4j.getProtocol().equals("file")) { + Path log4jPath = new Path(log4j.toURI()); + LOG.debug("Using log4j path {}", log4jPath); + mrJob.addFileToClassPath(log4jPath); + String sysprops = String.format("-Xmx256m -Dlog4j.configuration=%s", + log4j); + jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops); + jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops); + } + + applyCustomConfigOptions(jobConf); + // fail fast if anything goes wrong + mrJob.setMaxMapAttempts(1); + + mrJob.submit(); + try (DurationInfo d = new DurationInfo(LOG, "Job Execution")) { + boolean succeeded = mrJob.waitForCompletion(true); + assertTrue("MR job failed", succeeded); + } + + waitForConsistency(); + assertIsDirectory(outputPath); + FileStatus[] results = fs.listStatus(outputPath, + S3AUtils.HIDDEN_FILE_FILTER); + int fileCount = results.length; + List actualFiles = new ArrayList<>(fileCount); + assertTrue("No files in output directory", fileCount != 0); + LOG.info("Found {} files", fileCount); + for (FileStatus result : results) { + LOG.debug("result: {}", result); + actualFiles.add(result.getPath().toString()); + } + Collections.sort(actualFiles); + + // load in the success data marker: this guarantees that a s3guard + // committer was used + Path success = new Path(outputPath, _SUCCESS); + FileStatus status = fs.getFileStatus(success); + assertTrue("0 byte success file - not a s3guard committer " + success, + status.getLen() > 0); + SuccessData successData = SuccessData.load(fs, success); + String commitDetails = successData.toString(); + LOG.info("Committer name " + committerName() + "\n{}", + commitDetails); + LOG.info("Committer statistics: \n{}", + successData.dumpMetrics(" ", " = ", "\n")); + LOG.info("Diagnostics\n{}", + successData.dumpDiagnostics(" ", " = ", "\n")); + assertEquals("Wrong committer in " + commitDetails, + committerName(), successData.getCommitter()); + List successFiles = successData.getFilenames(); + assertTrue("No filenames in " + commitDetails, + !successFiles.isEmpty()); + + assertEquals("Should commit the expected files", + expectedFiles, actualFiles); + + Set summaryKeys = Sets.newHashSet(); + summaryKeys.addAll(successFiles); + assertEquals("Summary keyset doesn't list the the expected paths " + + commitDetails, expectedKeys, summaryKeys); + assertPathDoesNotExist("temporary dir", + new Path(outputPath, CommitConstants.TEMPORARY)); + customPostExecutionValidation(outputPath, successData); + } + + /** + * Get the file count for the test. + * @return the number of mappers to create. + */ + public int getTestFileCount() { + return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT; + } + + /** + * Override point to let implementations tune the MR Job conf. + * @param c configuration + */ + protected void applyCustomConfigOptions(Configuration c) { + + } + + /** + * Override point for any committer specific validation operations; + * called after the base assertions have all passed. + * @param destPath destination of work + * @param successData loaded success data + * @throws Exception failure + */ + protected void customPostExecutionValidation(Path destPath, + SuccessData successData) + throws Exception { + + } + + /** + * Test Mapper. + * This is executed in separate process, and must not make any assumptions + * about external state. + */ + public static class MapClass + extends Mapper { + + private int operations; + private String id = ""; + private LongWritable l = new LongWritable(); + private Text t = new Text(); + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + super.setup(context); + // force in Log4J logging + org.apache.log4j.BasicConfigurator.configure(); + boolean scaleMap = context.getConfiguration() + .getBoolean(KEY_SCALE_TESTS_ENABLED, false); + operations = scaleMap ? 1000 : 10; + id = context.getTaskAttemptID().toString(); + } + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + for (int i = 0; i < operations; i++) { + l.set(i); + t.set(String.format("%s:%05d", id, i)); + context.write(l, t); + } + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org