Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4C441200B6D for ; Tue, 23 Aug 2016 19:32:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4AB60160ABF; Tue, 23 Aug 2016 17:32:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 077A3160A81 for ; Tue, 23 Aug 2016 19:32:14 +0200 (CEST) Received: (qmail 23096 invoked by uid 500); 23 Aug 2016 17:32:06 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 22260 invoked by uid 99); 23 Aug 2016 17:32:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Aug 2016 17:32:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 22AACE08AD; Tue, 23 Aug 2016 17:32:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Tue, 23 Aug 2016 17:32:26 -0000 Message-Id: <50b89a8b7dcd4761b0a90c8bcaf9ff8a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [22/50] [abbrv] hadoop git commit: HADOOP-13446. Support running isolated unit tests separate from AWS integration tests. Contributed by Chris Nauroth. archived-at: Tue, 23 Aug 2016 17:32:17 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java new file mode 100644 index 0000000..b0b8a65 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.util.StopWatch; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Basic unit test for S3A's blocking executor service. + */ +public class ITestBlockingThreadPoolExecutorService { + + private static final Logger LOG = LoggerFactory.getLogger( + BlockingThreadPoolExecutorService.class); + + private static final int NUM_ACTIVE_TASKS = 4; + private static final int NUM_WAITING_TASKS = 2; + private static final int TASK_SLEEP_MSEC = 100; + private static final int SHUTDOWN_WAIT_MSEC = 200; + private static final int SHUTDOWN_WAIT_TRIES = 5; + private static final int BLOCKING_THRESHOLD_MSEC = 50; + + private static final Integer SOME_VALUE = 1337; + + private static BlockingThreadPoolExecutorService tpe = null; + + @AfterClass + public static void afterClass() throws Exception { + ensureDestroyed(); + } + + /** + * Basic test of running one trivial task. + */ + @Test + public void testSubmitCallable() throws Exception { + ensureCreated(); + ListenableFuture f = tpe.submit(callableSleeper); + Integer v = f.get(); + assertEquals(SOME_VALUE, v); + } + + /** + * More involved test, including detecting blocking when at capacity. + */ + @Test + public void testSubmitRunnable() throws Exception { + ensureCreated(); + int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS; + StopWatch stopWatch = new StopWatch().start(); + for (int i = 0; i < totalTasks; i++) { + tpe.submit(sleeper); + assertDidntBlock(stopWatch); + } + tpe.submit(sleeper); + assertDidBlock(stopWatch); + } + + @Test + public void testShutdown() throws Exception { + // Cover create / destroy, regardless of when this test case runs + ensureCreated(); + ensureDestroyed(); + + // Cover create, execute, destroy, regardless of when test case runs + ensureCreated(); + testSubmitRunnable(); + ensureDestroyed(); + } + + // Helper functions, etc. + + private void assertDidntBlock(StopWatch sw) { + try { + assertFalse("Non-blocking call took too long.", + sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC); + } finally { + sw.reset().start(); + } + } + + private void assertDidBlock(StopWatch sw) { + try { + if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) { + throw new RuntimeException("Blocking call returned too fast."); + } + } finally { + sw.reset().start(); + } + } + + private Runnable sleeper = new Runnable() { + @Override + public void run() { + String name = Thread.currentThread().getName(); + try { + Thread.sleep(TASK_SLEEP_MSEC); + } catch (InterruptedException e) { + LOG.info("Thread {} interrupted.", name); + Thread.currentThread().interrupt(); + } + } + }; + + private Callable callableSleeper = new Callable() { + @Override + public Integer call() throws Exception { + sleeper.run(); + return SOME_VALUE; + } + }; + + /** + * Helper function to create thread pool under test. + */ + private static void ensureCreated() throws Exception { + if (tpe == null) { + LOG.debug("Creating thread pool"); + tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS, + NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest"); + } + } + + /** + * Helper function to terminate thread pool under test, asserting that + * shutdown -> terminate works as expected. + */ + private static void ensureDestroyed() throws Exception { + if (tpe == null) { + return; + } + int shutdownTries = SHUTDOWN_WAIT_TRIES; + + tpe.shutdown(); + if (!tpe.isShutdown()) { + throw new RuntimeException("Shutdown had no effect."); + } + + while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC, + TimeUnit.MILLISECONDS)) { + LOG.info("Waiting for thread pool shutdown."); + if (shutdownTries-- <= 0) { + LOG.error("Failed to terminate thread pool gracefully."); + break; + } + } + if (!tpe.isTerminated()) { + tpe.shutdownNow(); + if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC, + TimeUnit.MILLISECONDS)) { + throw new RuntimeException( + "Failed to terminate thread pool in timely manner."); + } + } + tpe = null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAWSCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAWSCredentialsProvider.java new file mode 100644 index 0000000..cf8783c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAWSCredentialsProvider.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.AccessDeniedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.junit.Assert.*; + +/** + * Tests for {@link Constants#AWS_CREDENTIALS_PROVIDER} logic. + * + */ +public class ITestS3AAWSCredentialsProvider { + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AAWSCredentialsProvider.class); + + @Rule + public Timeout testTimeout = new Timeout(1 * 60 * 1000); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + /** + * Declare what exception to raise, and the text which must be found + * in it. + * @param exceptionClass class of exception + * @param text text in exception + */ + private void expectException(Class exceptionClass, + String text) { + exception.expect(exceptionClass); + exception.expectMessage(text); + } + + @Test + public void testBadConfiguration() throws IOException { + Configuration conf = new Configuration(); + conf.set(AWS_CREDENTIALS_PROVIDER, "no.such.class"); + try { + createFailingFS(conf); + } catch (IOException e) { + if (!(e.getCause() instanceof ClassNotFoundException)) { + LOG.error("Unexpected nested cause: {} in {}", e.getCause(), e, e); + throw e; + } + } + } + + /** + * Create a filesystem, expect it to fail by raising an IOException. + * Raises an assertion exception if in fact the FS does get instantiated. + * @param conf configuration + * @throws IOException an expected exception. + */ + private void createFailingFS(Configuration conf) throws IOException { + S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf); + fs.listStatus(new Path("/")); + fail("Expected exception - got " + fs); + } + + static class BadCredentialsProvider implements AWSCredentialsProvider { + + @SuppressWarnings("unused") + public BadCredentialsProvider(URI name, Configuration conf) { + } + + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials("bad_key", "bad_secret"); + } + + @Override + public void refresh() { + } + } + + @Test + public void testBadCredentials() throws Exception { + Configuration conf = new Configuration(); + conf.set(AWS_CREDENTIALS_PROVIDER, BadCredentialsProvider.class.getName()); + try { + createFailingFS(conf); + } catch (AccessDeniedException e) { + // expected + } + } + + static class GoodCredentialsProvider extends AWSCredentialsProviderChain { + + @SuppressWarnings("unused") + public GoodCredentialsProvider(URI name, Configuration conf) { + super(new BasicAWSCredentialsProvider(conf.get(ACCESS_KEY), + conf.get(SECRET_KEY)), new InstanceProfileCredentialsProvider()); + } + } + + @Test + public void testGoodProvider() throws Exception { + Configuration conf = new Configuration(); + conf.set(AWS_CREDENTIALS_PROVIDER, GoodCredentialsProvider.class.getName()); + S3ATestUtils.createTestFileSystem(conf); + } + + @Test + public void testAnonymousProvider() throws Exception { + Configuration conf = new Configuration(); + conf.set(AWS_CREDENTIALS_PROVIDER, + AnonymousAWSCredentialsProvider.class.getName()); + Path testFile = new Path( + conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE)); + S3ATestUtils.useCSVDataEndpoint(conf); + FileSystem fs = FileSystem.newInstance(testFile.toUri(), conf); + assertNotNull(fs); + assertTrue(fs instanceof S3AFileSystem); + FileStatus stat = fs.getFileStatus(testFile); + assertNotNull(stat); + assertEquals(testFile, stat.getPath()); + } + + /** + * A credential provider whose constructor signature doesn't match. + */ + static class ConstructorSignatureErrorProvider + implements AWSCredentialsProvider { + + @SuppressWarnings("unused") + public ConstructorSignatureErrorProvider(String str) { + } + + @Override + public AWSCredentials getCredentials() { + return null; + } + + @Override + public void refresh() { + } + } + + /** + * A credential provider whose constructor raises an NPE. + */ + static class ConstructorFailureProvider + implements AWSCredentialsProvider { + + @SuppressWarnings("unused") + public ConstructorFailureProvider() { + throw new NullPointerException("oops"); + } + + @Override + public AWSCredentials getCredentials() { + return null; + } + + @Override + public void refresh() { + } + } + + @Test + public void testProviderWrongClass() throws Exception { + expectProviderInstantiationFailure(this.getClass().getName(), + NOT_AWS_PROVIDER); + } + + @Test + public void testProviderNotAClass() throws Exception { + expectProviderInstantiationFailure("NoSuchClass", + "ClassNotFoundException"); + } + + private void expectProviderInstantiationFailure(String option, + String expectedErrorText) throws IOException { + Configuration conf = new Configuration(); + conf.set(AWS_CREDENTIALS_PROVIDER, option); + Path testFile = new Path( + conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE)); + expectException(IOException.class, expectedErrorText); + URI uri = testFile.toUri(); + S3AUtils.createAWSCredentialProviderSet(uri, conf, uri); + } + + @Test + public void testProviderConstructorError() throws Exception { + expectProviderInstantiationFailure( + ConstructorSignatureErrorProvider.class.getName(), + CONSTRUCTOR_EXCEPTION); + } + + @Test + public void testProviderFailureError() throws Exception { + expectProviderInstantiationFailure( + ConstructorFailureProvider.class.getName(), + INSTANTIATION_EXCEPTION); + } + + @Test + public void testInstantiationChain() throws Throwable { + Configuration conf = new Configuration(); + conf.set(AWS_CREDENTIALS_PROVIDER, + TemporaryAWSCredentialsProvider.NAME + + ", \t" + SimpleAWSCredentialsProvider.NAME + + " ,\n " + AnonymousAWSCredentialsProvider.NAME); + Path testFile = new Path( + conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE)); + + URI uri = testFile.toUri(); + S3AUtils.createAWSCredentialProviderSet(uri, conf, uri); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java new file mode 100644 index 0000000..4444d0c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +/** + * Demonstrate that the threadpool blocks additional client requests if + * its queue is full (rather than throwing an exception) by initiating an + * upload consisting of 4 parts with 2 threads and 1 spot in the queue. The + * 4th part should not trigger an exception as it would with a + * non-blocking threadpool. + */ +public class ITestS3ABlockingThreadPool { + + private Configuration conf; + private S3AFileSystem fs; + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + protected Path getTestPath() { + return new Path("/tests3a"); + } + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024); + conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024); + conf.setInt(Constants.MAX_THREADS, 2); + conf.setInt(Constants.MAX_TOTAL_TASKS, 1); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(getTestPath(), true); + } + } + + @Test + public void testRegularMultiPartUpload() throws Exception { + fs = S3ATestUtils.createTestFileSystem(conf); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 * + 1024); + } + + @Test + public void testFastMultiPartUpload() throws Exception { + conf.setBoolean(Constants.FAST_UPLOAD, true); + fs = S3ATestUtils.createTestFileSystem(conf); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 * + 1024); + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlocksize.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlocksize.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlocksize.java new file mode 100644 index 0000000..9a6dae7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlocksize.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.fileStatsToString; + +/** + * S3A tests for configuring block size. + */ +public class ITestS3ABlocksize extends AbstractFSContractTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3ABlocksize.class); + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @Test + @SuppressWarnings("deprecation") + public void testBlockSize() throws Exception { + FileSystem fs = getFileSystem(); + long defaultBlockSize = fs.getDefaultBlockSize(); + assertEquals("incorrect blocksize", + S3AFileSystem.DEFAULT_BLOCKSIZE, defaultBlockSize); + long newBlockSize = defaultBlockSize * 2; + fs.getConf().setLong(Constants.FS_S3A_BLOCK_SIZE, newBlockSize); + + Path dir = path("testBlockSize"); + Path file = new Path(dir, "file"); + createFile(fs, file, true, dataset(1024, 'a', 'z' - 'a')); + FileStatus fileStatus = fs.getFileStatus(file); + assertEquals("Double default block size in stat(): " + fileStatus, + newBlockSize, + fileStatus.getBlockSize()); + + // check the listing & assert that the block size is picked up by + // this route too. + boolean found = false; + FileStatus[] listing = fs.listStatus(dir); + for (FileStatus stat : listing) { + LOG.info("entry: {}", stat); + if (file.equals(stat.getPath())) { + found = true; + assertEquals("Double default block size in ls(): " + stat, + newBlockSize, + stat.getBlockSize()); + } + } + assertTrue("Did not find " + fileStatsToString(listing, ", "), found); + } + + @Test + public void testRootFileStatusHasBlocksize() throws Throwable { + FileSystem fs = getFileSystem(); + FileStatus status = fs.getFileStatus(new Path("/")); + assertTrue("Invalid root blocksize", + status.getBlockSize() >= 0); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java new file mode 100644 index 0000000..4e99339 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3native.S3xLoginHelper; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.File; +import java.net.URI; + +import org.apache.hadoop.security.ProviderUtils; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.hadoop.util.VersionInfo; +import org.apache.http.HttpStatus; +import org.junit.rules.TemporaryFolder; + +/** + * S3A tests for configuration. + */ +public class ITestS3AConfiguration { + private static final String EXAMPLE_ID = "AKASOMEACCESSKEY"; + private static final String EXAMPLE_KEY = + "RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE"; + + private Configuration conf; + private S3AFileSystem fs; + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AConfiguration.class); + + private static final String TEST_ENDPOINT = "test.fs.s3a.endpoint"; + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + + /** + * Test if custom endpoint is picked up. + *

+ * The test expects TEST_ENDPOINT to be defined in the Configuration + * describing the endpoint of the bucket to which TEST_FS_S3A_NAME points + * (f.i. "s3-eu-west-1.amazonaws.com" if the bucket is located in Ireland). + * Evidently, the bucket has to be hosted in the region denoted by the + * endpoint for the test to succeed. + *

+ * More info and the list of endpoint identifiers: + * http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region + * + * @throws Exception + */ + @Test + public void testEndpoint() throws Exception { + conf = new Configuration(); + String endpoint = conf.getTrimmed(TEST_ENDPOINT, ""); + if (endpoint.isEmpty()) { + LOG.warn("Custom endpoint test skipped as " + TEST_ENDPOINT + "config " + + "setting was not detected"); + } else { + conf.set(Constants.ENDPOINT, endpoint); + fs = S3ATestUtils.createTestFileSystem(conf); + AmazonS3Client s3 = fs.getAmazonS3Client(); + String endPointRegion = ""; + // Differentiate handling of "s3-" and "s3." based endpoint identifiers + String[] endpointParts = StringUtils.split(endpoint, '.'); + if (endpointParts.length == 3) { + endPointRegion = endpointParts[0].substring(3); + } else if (endpointParts.length == 4) { + endPointRegion = endpointParts[1]; + } else { + fail("Unexpected endpoint"); + } + assertEquals("Endpoint config setting and bucket location differ: ", + endPointRegion, s3.getBucketLocation(fs.getUri().getHost())); + } + } + + @Test + public void testProxyConnection() throws Exception { + conf = new Configuration(); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.set(Constants.PROXY_HOST, "127.0.0.1"); + conf.setInt(Constants.PROXY_PORT, 1); + String proxy = + conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server at " + proxy); + } catch (AWSClientIOException e) { + // expected + } + } + + @Test + public void testProxyPortWithoutHost() throws Exception { + conf = new Configuration(); + conf.unset(Constants.PROXY_HOST); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.setInt(Constants.PROXY_PORT, 1); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a proxy configuration error"); + } catch (IllegalArgumentException e) { + String msg = e.toString(); + if (!msg.contains(Constants.PROXY_HOST) && + !msg.contains(Constants.PROXY_PORT)) { + throw e; + } + } + } + + @Test + public void testAutomaticProxyPortSelection() throws Exception { + conf = new Configuration(); + conf.unset(Constants.PROXY_PORT); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.set(Constants.PROXY_HOST, "127.0.0.1"); + conf.set(Constants.SECURE_CONNECTIONS, "true"); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server"); + } catch (AWSClientIOException e) { + // expected + } + conf.set(Constants.SECURE_CONNECTIONS, "false"); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server"); + } catch (AWSClientIOException e) { + // expected + } + } + + @Test + public void testUsernameInconsistentWithPassword() throws Exception { + conf = new Configuration(); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.set(Constants.PROXY_HOST, "127.0.0.1"); + conf.setInt(Constants.PROXY_PORT, 1); + conf.set(Constants.PROXY_USERNAME, "user"); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server"); + } catch (IllegalArgumentException e) { + String msg = e.toString(); + if (!msg.contains(Constants.PROXY_USERNAME) && + !msg.contains(Constants.PROXY_PASSWORD)) { + throw e; + } + } + conf = new Configuration(); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.set(Constants.PROXY_HOST, "127.0.0.1"); + conf.setInt(Constants.PROXY_PORT, 1); + conf.set(Constants.PROXY_PASSWORD, "password"); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server"); + } catch (IllegalArgumentException e) { + String msg = e.toString(); + if (!msg.contains(Constants.PROXY_USERNAME) && + !msg.contains(Constants.PROXY_PASSWORD)) { + throw e; + } + } + } + + @Test + public void testCredsFromCredentialProvider() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + provisionAccessKeys(conf); + + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM"); + S3xLoginHelper.Login creds = + S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf); + assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser()); + assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword()); + } + + void provisionAccessKeys(final Configuration conf) throws Exception { + // add our creds to the provider + final CredentialProvider provider = + CredentialProviderFactory.getProviders(conf).get(0); + provider.createCredentialEntry(Constants.ACCESS_KEY, + EXAMPLE_ID.toCharArray()); + provider.createCredentialEntry(Constants.SECRET_KEY, + EXAMPLE_KEY.toCharArray()); + provider.flush(); + } + + @Test + public void testCredsFromUserInfo() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + provisionAccessKeys(conf); + + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM"); + URI uriWithUserInfo = new URI("s3a://123:456@foobar"); + S3xLoginHelper.Login creds = + S3AUtils.getAWSAccessKeys(uriWithUserInfo, conf); + assertEquals("AccessKey incorrect.", "123", creds.getUser()); + assertEquals("SecretKey incorrect.", "456", creds.getPassword()); + } + + @Test + public void testIDFromUserInfoSecretFromCredentialProvider() + throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + provisionAccessKeys(conf); + + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM"); + URI uriWithUserInfo = new URI("s3a://123@foobar"); + S3xLoginHelper.Login creds = + S3AUtils.getAWSAccessKeys(uriWithUserInfo, conf); + assertEquals("AccessKey incorrect.", "123", creds.getUser()); + assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword()); + } + + @Test + public void testSecretFromCredentialProviderIDFromConfig() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + // add our creds to the provider + final CredentialProvider provider = + CredentialProviderFactory.getProviders(conf).get(0); + provider.createCredentialEntry(Constants.SECRET_KEY, + EXAMPLE_KEY.toCharArray()); + provider.flush(); + + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID); + S3xLoginHelper.Login creds = + S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf); + assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser()); + assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword()); + } + + @Test + public void testIDFromCredentialProviderSecretFromConfig() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + // add our creds to the provider + final CredentialProvider provider = + CredentialProviderFactory.getProviders(conf).get(0); + provider.createCredentialEntry(Constants.ACCESS_KEY, + EXAMPLE_ID.toCharArray()); + provider.flush(); + + conf.set(Constants.SECRET_KEY, EXAMPLE_KEY); + S3xLoginHelper.Login creds = + S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf); + assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser()); + assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword()); + } + + @Test + public void testExcludingS3ACredentialProvider() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + "jceks://s3a/foobar," + jks.toString()); + + // first make sure that the s3a based provider is removed + Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders( + conf, S3AFileSystem.class); + String newPath = conf.get( + CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH); + assertFalse("Provider Path incorrect", newPath.contains("s3a://")); + + // now let's make sure the new path is created by the S3AFileSystem + // and the integration still works. Let's provision the keys through + // the altered configuration instance and then try and access them + // using the original config with the s3a provider in the path. + provisionAccessKeys(c); + + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM"); + URI uriWithUserInfo = new URI("s3a://123:456@foobar"); + S3xLoginHelper.Login creds = + S3AUtils.getAWSAccessKeys(uriWithUserInfo, conf); + assertEquals("AccessKey incorrect.", "123", creds.getUser()); + assertEquals("SecretKey incorrect.", "456", creds.getPassword()); + + } + + @Test + public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty() + throws Exception { + + conf = new Configuration(); + conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true)); + assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false)); + + try { + fs = S3ATestUtils.createTestFileSystem(conf); + assertNotNull(fs); + AmazonS3Client s3 = fs.getAmazonS3Client(); + assertNotNull(s3); + S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class, + "clientOptions"); + assertTrue("Expected to find path style access to be switched on!", + clientOptions.isPathStyleAccess()); + byte[] file = ContractTestUtils.toAsciiByteArray("test file"); + ContractTestUtils.writeAndRead(fs, + new Path("/path/style/access/testFile"), file, file.length, + conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true); + } catch (final AWSS3IOException e) { + LOG.error("Caught exception: ", e); + // Catch/pass standard path style access behaviour when live bucket + // isn't in the same region as the s3 client default. See + // http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html + assertEquals(e.getStatusCode(), HttpStatus.SC_MOVED_PERMANENTLY); + } + } + + @Test + public void testDefaultUserAgent() throws Exception { + conf = new Configuration(); + fs = S3ATestUtils.createTestFileSystem(conf); + assertNotNull(fs); + AmazonS3Client s3 = fs.getAmazonS3Client(); + assertNotNull(s3); + ClientConfiguration awsConf = getField(s3, ClientConfiguration.class, + "clientConfiguration"); + assertEquals("Hadoop " + VersionInfo.getVersion(), awsConf.getUserAgent()); + } + + @Test + public void testCustomUserAgent() throws Exception { + conf = new Configuration(); + conf.set(Constants.USER_AGENT_PREFIX, "MyApp"); + fs = S3ATestUtils.createTestFileSystem(conf); + assertNotNull(fs); + AmazonS3Client s3 = fs.getAmazonS3Client(); + assertNotNull(s3); + ClientConfiguration awsConf = getField(s3, ClientConfiguration.class, + "clientConfiguration"); + assertEquals("MyApp, Hadoop " + VersionInfo.getVersion(), + awsConf.getUserAgent()); + } + + /** + * Reads and returns a field from an object using reflection. If the field + * cannot be found, is null, or is not the expected type, then this method + * fails the test. + * + * @param target object to read + * @param fieldType type of field to read, which will also be the return type + * @param fieldName name of field to read + * @return field that was read + * @throws IllegalAccessException if access not allowed + */ + private static T getField(Object target, Class fieldType, + String fieldName) throws IllegalAccessException { + Object obj = FieldUtils.readField(target, fieldName, true); + assertNotNull(String.format( + "Could not read field named %s in object with class %s.", fieldName, + target.getClass().getName()), obj); + assertTrue(String.format( + "Unexpected type found for field named %s, expected %s, actual %s.", + fieldName, fieldType.getName(), obj.getClass().getName()), + fieldType.isAssignableFrom(obj.getClass())); + return fieldType.cast(obj); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java new file mode 100644 index 0000000..b3d7abf --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLEncoder; +import java.nio.file.AccessDeniedException; + +import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME; + +/** + * Tests that credentials can go into the URL. This includes a valid + * set, and a check that an invalid set do at least get stripped out + * of the final URI + */ +public class ITestS3ACredentialsInURL extends Assert { + private S3AFileSystem fs; + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3ACredentialsInURL.class); + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @After + public void teardown() { + IOUtils.closeStream(fs); + } + + /** + * Test instantiation. + * @throws Throwable + */ + @Test + public void testInstantiateFromURL() throws Throwable { + + Configuration conf = new Configuration(); + String accessKey = conf.get(Constants.ACCESS_KEY); + String secretKey = conf.get(Constants.SECRET_KEY); + String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, ""); + Assume.assumeNotNull(fsname, accessKey, secretKey); + URI original = new URI(fsname); + URI secretsURI = createUriWithEmbeddedSecrets(original, + accessKey, secretKey); + if (secretKey.contains("/")) { + assertTrue("test URI encodes the / symbol", secretsURI.toString(). + contains("%252F")); + } + if (secretKey.contains("+")) { + assertTrue("test URI encodes the + symbol", secretsURI.toString(). + contains("%252B")); + } + assertFalse("Does not contain secrets", original.equals(secretsURI)); + + conf.set(TEST_FS_S3A_NAME, secretsURI.toString()); + conf.unset(Constants.ACCESS_KEY); + conf.unset(Constants.SECRET_KEY); + fs = S3ATestUtils.createTestFileSystem(conf); + String fsURI = fs.getUri().toString(); + assertFalse("FS URI contains a @ symbol", fsURI.contains("@")); + assertFalse("FS URI contains a % symbol", fsURI.contains("%")); + if (!original.toString().startsWith(fsURI)) { + fail("Filesystem URI does not match original"); + } + validate("original path", new Path(original)); + validate("bare path", new Path("/")); + validate("secrets path", new Path(secretsURI)); + } + + private void validate(String text, Path path) throws IOException { + try { + fs.canonicalizeUri(path.toUri()); + fs.checkPath(path); + assertTrue(text + " Not a directory", + fs.getFileStatus(new Path("/")).isDirectory()); + fs.globStatus(path); + } catch (AssertionError e) { + throw e; + } catch (Exception e) { + LOG.debug("{} failure: {}", text, e, e); + fail(text + " Test failed"); + } + } + + /** + * Set up some invalid credentials, verify login is rejected. + * @throws Throwable + */ + @Test + public void testInvalidCredentialsFail() throws Throwable { + Configuration conf = new Configuration(); + String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, ""); + Assume.assumeNotNull(fsname); + URI original = new URI(fsname); + URI testURI = createUriWithEmbeddedSecrets(original, "user", "//"); + + conf.set(TEST_FS_S3A_NAME, testURI.toString()); + fs = S3ATestUtils.createTestFileSystem(conf); + try { + S3AFileStatus status = fs.getFileStatus(new Path("/")); + fail("Expected an AccessDeniedException, got " + status); + } catch (AccessDeniedException e) { + // expected + } + + } + + private URI createUriWithEmbeddedSecrets(URI original, + String accessKey, + String secretKey) throws UnsupportedEncodingException { + String encodedSecretKey = URLEncoder.encode(secretKey, "UTF-8"); + String formattedString = String.format("%s://%s:%s@%s/%s/", + original.getScheme(), + accessKey, + encodedSecretKey, + original.getHost(), + original.getPath()); + URI testURI; + try { + testURI = new Path(formattedString).toUri(); + } catch (IllegalArgumentException e) { + // inner cause is stripped to keep any secrets out of stack traces + throw new IllegalArgumentException("Could not encode Path"); + } + return testURI; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java new file mode 100644 index 0000000..4543278 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.model.ObjectMetadata; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + +/** + * Test whether or not encryption works by turning it on. Some checks + * are made for different file sizes as there have been reports that the + * file length may be rounded up to match word boundaries. + */ +public class ITestS3AEncryption extends AbstractS3ATestBase { + private static final String AES256 = Constants.SERVER_SIDE_ENCRYPTION_AES256; + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, + AES256); + return conf; + } + + private static final int[] SIZES = { + 0, 1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 10 - 3, 2 ^ 11 - 2, 2 ^ 12 - 1 + }; + + @Override + public void teardown() throws Exception { + super.teardown(); + IOUtils.closeStream(getFileSystem()); + } + + @Test + public void testEncryption() throws Throwable { + for (int size: SIZES) { + validateEncryptionForFilesize(size); + } + } + + @Test + public void testEncryptionOverRename() throws Throwable { + skipIfEncryptionTestsDisabled(getConfiguration()); + Path src = path(createFilename(1024)); + byte[] data = dataset(1024, 'a', 'z'); + S3AFileSystem fs = getFileSystem(); + writeDataset(fs, src, data, data.length, 1024 * 1024, true); + ContractTestUtils.verifyFileContents(fs, src, data); + Path dest = path(src.getName() + "-copy"); + fs.rename(src, dest); + ContractTestUtils.verifyFileContents(fs, dest, data); + assertEncrypted(dest); + } + + protected void validateEncryptionForFilesize(int len) throws IOException { + skipIfEncryptionTestsDisabled(getConfiguration()); + describe("Create an encrypted file of size " + len); + String src = createFilename(len); + Path path = writeThenReadFile(src, len); + assertEncrypted(path); + rm(getFileSystem(), path, false, false); + } + + private String createFilename(int len) { + return String.format("%s-%04x", methodName.getMethodName(), len); + } + + /** + * Assert that at path references an encrypted blob. + * @param path path + * @throws IOException on a failure + */ + private void assertEncrypted(Path path) throws IOException { + ObjectMetadata md = getFileSystem().getObjectMetadata(path); + assertEquals(AES256, md.getSSEAlgorithm()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java new file mode 100644 index 0000000..81578c2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + +/** + * Test whether or not encryption settings propagate by choosing an invalid + * one. We expect the write to fail with a 400 bad request error + */ +public class ITestS3AEncryptionAlgorithmPropagation + extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, + "DES"); + return conf; + } + + @Override + public void teardown() throws Exception { + super.teardown(); + IOUtils.closeStream(getFileSystem()); + } + + @Test + public void testEncrypt0() throws Throwable { + writeThenReadFileToFailure(0); + } + + @Test + public void testEncrypt256() throws Throwable { + writeThenReadFileToFailure(256); + } + + /** + * Make this a no-op so test setup doesn't fail. + * @param path path path + * @throws IOException on any failure + */ + @Override + protected void mkdirs(Path path) throws IOException { + + } + + protected void writeThenReadFileToFailure(int len) throws IOException { + skipIfEncryptionTestsDisabled(getConfiguration()); + describe("Create an encrypted file of size " + len); + try { + writeThenReadFile(methodName.getMethodName() + '-' + len, len); + fail("Expected an exception about an illegal encryption algorithm"); + } catch (AWSS3IOException e) { + assertStatusCode(e, 400); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java new file mode 100644 index 0000000..c06fed1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; + +/** + * Run the encryption tests against the Fast output stream. + * This verifies that both file writing paths can encrypt their data. + */ +public class ITestS3AEncryptionFastOutputStream extends ITestS3AEncryption { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(Constants.FAST_UPLOAD, true); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java new file mode 100644 index 0000000..d8e017e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.nio.file.AccessDeniedException; +import java.util.concurrent.Callable; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; + +/** + * Test S3A Failure translation, including a functional test + * generating errors during stream IO. + */ +public class ITestS3AFailureHandling extends AbstractFSContractTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AFailureHandling.class); + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Test + public void testReadFileChanged() throws Throwable { + describe("overwrite a file with a shorter one during a read, seek"); + final int fullLength = 8192; + final byte[] fullDataset = dataset(fullLength, 'a', 32); + final int shortLen = 4096; + final byte[] shortDataset = dataset(shortLen, 'A', 32); + final FileSystem fs = getFileSystem(); + final Path testpath = path("readFileToChange.txt"); + // initial write + writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false); + try(FSDataInputStream instream = fs.open(testpath)) { + instream.seek(fullLength - 16); + assertTrue("no data to read", instream.read() >= 0); + // overwrite + writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true); + // here the file length is less. Probe the file to see if this is true, + // with a spin and wait + eventually(30 *1000, new Callable() { + @Override + public Void call() throws Exception { + assertEquals(shortLen, fs.getFileStatus(testpath).getLen()); + return null; + } + }); + // here length is shorter. Assuming it has propagated to all replicas, + // the position of the input stream is now beyond the EOF. + // An attempt to seek backwards to a position greater than the + // short length will raise an exception from AWS S3, which must be + // translated into an EOF + + instream.seek(shortLen + 1024); + int c = instream.read(); + assertIsEOF("read()", c); + + byte[] buf = new byte[256]; + + assertIsEOF("read(buffer)", instream.read(buf)); + assertIsEOF("read(offset)", + instream.read(instream.getPos(), buf, 0, buf.length)); + + // now do a block read fully, again, backwards from the current pos + try { + instream.readFully(shortLen + 512, buf); + fail("Expected readFully to fail"); + } catch (EOFException expected) { + LOG.debug("Expected EOF: ", expected); + } + + assertIsEOF("read(offset)", + instream.read(shortLen + 510, buf, 0, buf.length)); + + // seek somewhere useful + instream.seek(shortLen - 256); + + // delete the file. Reads must fail + fs.delete(testpath, false); + + try { + int r = instream.read(); + fail("Expected an exception, got " + r); + } catch (FileNotFoundException e) { + // expected + } + + try { + instream.readFully(2048, buf); + fail("Expected readFully to fail"); + } catch (FileNotFoundException e) { + // expected + } + + } + } + + /** + * Assert that a read operation returned an EOF value. + * @param operation specific operation + * @param readResult result + */ + private void assertIsEOF(String operation, int readResult) { + assertEquals("Expected EOF from "+ operation + + "; got char " + (char) readResult, -1, readResult); + } + + @Test + public void test404isNotFound() throws Throwable { + verifyTranslated(FileNotFoundException.class, createS3Exception(404)); + } + + protected Exception verifyTranslated(Class clazz, + AmazonClientException exception) throws Exception { + return verifyExceptionClass(clazz, + translateException("test", "/", exception)); + } + + @Test + public void test401isNotPermittedFound() throws Throwable { + verifyTranslated(AccessDeniedException.class, + createS3Exception(401)); + } + + protected AmazonS3Exception createS3Exception(int code) { + AmazonS3Exception source = new AmazonS3Exception(""); + source.setStatusCode(code); + return source; + } + + @Test + public void testGenericS3Exception() throws Throwable { + // S3 exception of no known type + AWSS3IOException ex = (AWSS3IOException)verifyTranslated( + AWSS3IOException.class, + createS3Exception(451)); + assertEquals(451, ex.getStatusCode()); + } + + @Test + public void testGenericServiceS3Exception() throws Throwable { + // service exception of no known type + AmazonServiceException ase = new AmazonServiceException("unwind"); + ase.setStatusCode(500); + AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated( + AWSServiceIOException.class, + ase); + assertEquals(500, ex.getStatusCode()); + } + + @Test + public void testGenericClientException() throws Throwable { + // Generic Amazon exception + verifyTranslated(AWSClientIOException.class, + new AmazonClientException("")); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java new file mode 100644 index 0000000..b5fa1c3 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** + * Tests regular and multi-part upload functionality for S3AFastOutputStream. + * File sizes are kept small to reduce test duration on slow connections + */ +public class ITestS3AFastOutputStream { + private FileSystem fs; + + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024); + conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024); + conf.setBoolean(Constants.FAST_UPLOAD, true); + fs = S3ATestUtils.createTestFileSystem(conf); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(getTestPath(), true); + } + } + + protected Path getTestPath() { + return new Path("/tests3a"); + } + + @Test + public void testRegularUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); + } + + @Test + public void testMultiPartUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * + 1024); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java new file mode 100644 index 0000000..2a6ba0c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.net.URI; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.MetricDiff; +import static org.apache.hadoop.test.GenericTestUtils.getTestDir; + +/** + * Use metrics to assert about the cost of file status queries. + * {@link S3AFileSystem#getFileStatus(Path)}. + */ +public class ITestS3AFileOperationCost extends AbstractFSContractTestBase { + + private MetricDiff metadataRequests; + private MetricDiff listRequests; + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AFileOperationCost.class); + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public S3AFileSystem getFileSystem() { + return (S3AFileSystem) super.getFileSystem(); + } + + @Override + public void setup() throws Exception { + super.setup(); + S3AFileSystem fs = getFileSystem(); + metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS); + listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS); + } + + @Test + public void testCostOfGetFileStatusOnFile() throws Throwable { + describe("performing getFileStatus on a file"); + Path simpleFile = path("simple.txt"); + S3AFileSystem fs = getFileSystem(); + touch(fs, simpleFile); + resetMetricDiffs(); + S3AFileStatus status = fs.getFileStatus(simpleFile); + assertTrue("not a file: " + status, status.isFile()); + metadataRequests.assertDiffEquals(1); + listRequests.assertDiffEquals(0); + } + + private void resetMetricDiffs() { + reset(metadataRequests, listRequests); + } + + @Test + public void testCostOfGetFileStatusOnEmptyDir() throws Throwable { + describe("performing getFileStatus on an empty directory"); + S3AFileSystem fs = getFileSystem(); + Path dir = path("empty"); + fs.mkdirs(dir); + resetMetricDiffs(); + S3AFileStatus status = fs.getFileStatus(dir); + assertTrue("not empty: " + status, status.isEmptyDirectory()); + metadataRequests.assertDiffEquals(2); + listRequests.assertDiffEquals(0); + } + + @Test + public void testCostOfGetFileStatusOnMissingFile() throws Throwable { + describe("performing getFileStatus on a missing file"); + S3AFileSystem fs = getFileSystem(); + Path path = path("missing"); + resetMetricDiffs(); + try { + S3AFileStatus status = fs.getFileStatus(path); + fail("Got a status back from a missing file path " + status); + } catch (FileNotFoundException expected) { + // expected + } + metadataRequests.assertDiffEquals(2); + listRequests.assertDiffEquals(1); + } + + @Test + public void testCostOfGetFileStatusOnMissingSubPath() throws Throwable { + describe("performing getFileStatus on a missing file"); + S3AFileSystem fs = getFileSystem(); + Path path = path("missingdir/missingpath"); + resetMetricDiffs(); + try { + S3AFileStatus status = fs.getFileStatus(path); + fail("Got a status back from a missing file path " + status); + } catch (FileNotFoundException expected) { + // expected + } + metadataRequests.assertDiffEquals(2); + listRequests.assertDiffEquals(1); + } + + @Test + public void testCostOfGetFileStatusOnNonEmptyDir() throws Throwable { + describe("performing getFileStatus on a non-empty directory"); + S3AFileSystem fs = getFileSystem(); + Path dir = path("empty"); + fs.mkdirs(dir); + Path simpleFile = new Path(dir, "simple.txt"); + touch(fs, simpleFile); + resetMetricDiffs(); + S3AFileStatus status = fs.getFileStatus(dir); + if (status.isEmptyDirectory()) { + // erroneous state + String fsState = fs.toString(); + fail("FileStatus says directory isempty: " + status + + "\n" + ContractTestUtils.ls(fs, dir) + + "\n" + fsState); + } + metadataRequests.assertDiffEquals(2); + listRequests.assertDiffEquals(1); + } + + @Test + public void testCostOfCopyFromLocalFile() throws Throwable { + describe("testCostOfCopyFromLocalFile"); + File localTestDir = getTestDir("tmp"); + localTestDir.mkdirs(); + File tmpFile = File.createTempFile("tests3acost", ".txt", + localTestDir); + tmpFile.delete(); + try { + URI localFileURI = tmpFile.toURI(); + FileSystem localFS = FileSystem.get(localFileURI, + getFileSystem().getConf()); + Path localPath = new Path(localFileURI); + int len = 10 * 1024; + byte[] data = dataset(len, 'A', 'Z'); + writeDataset(localFS, localPath, data, len, 1024, true); + S3AFileSystem s3a = getFileSystem(); + MetricDiff copyLocalOps = new MetricDiff(s3a, + INVOCATION_COPY_FROM_LOCAL_FILE); + MetricDiff putRequests = new MetricDiff(s3a, + OBJECT_PUT_REQUESTS); + MetricDiff putBytes = new MetricDiff(s3a, + OBJECT_PUT_BYTES); + + Path remotePath = path("copied"); + s3a.copyFromLocalFile(false, true, localPath, remotePath); + verifyFileContents(s3a, remotePath, data); + copyLocalOps.assertDiffEquals(1); + putRequests.assertDiffEquals(1); + putBytes.assertDiffEquals(len); + // print final stats + LOG.info("Filesystem {}", s3a); + } finally { + tmpFile.delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java new file mode 100644 index 0000000..858ac22 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.apache.hadoop.fs.Path; + +/** + * Tests a live S3 system. If your keys and bucket aren't specified, all tests + * are marked as passed. + * + * This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from + * TestCase which uses the old Junit3 runner that doesn't ignore assumptions + * properly making it impossible to skip the tests if we don't have a valid + * bucket. + **/ +public class ITestS3AFileSystemContract extends FileSystemContractBaseTest { + + protected static final Logger LOG = + LoggerFactory.getLogger(ITestS3AFileSystemContract.class); + + @Override + public void setUp() throws Exception { + Configuration conf = new Configuration(); + + fs = S3ATestUtils.createTestFileSystem(conf); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + if (fs != null) { + fs.delete(path("test"), true); + } + super.tearDown(); + } + + @Override + public void testMkdirsWithUmask() throws Exception { + // not supported + } + + @Override + public void testRenameFileAsExistingFile() throws Exception { + if (!renameSupported()) { + return; + } + + Path src = path("/test/hadoop/file"); + createFile(src); + Path dst = path("/test/new/newfile"); + createFile(dst); + // s3 doesn't support rename option + // rename-overwrites-dest is always allowed. + rename(src, dst, true, false, true); + } + + @Override + public void testRenameDirectoryAsExistingDirectory() throws Exception { + if (!renameSupported()) { + return; + } + + Path src = path("/test/hadoop/dir"); + fs.mkdirs(src); + createFile(path("/test/hadoop/dir/file1")); + createFile(path("/test/hadoop/dir/subdir/file2")); + + Path dst = path("/test/new/newdir"); + fs.mkdirs(dst); + rename(src, dst, true, false, true); + assertFalse("Nested file1 exists", + fs.exists(path("/test/hadoop/dir/file1"))); + assertFalse("Nested file2 exists", + fs.exists(path("/test/hadoop/dir/subdir/file2"))); + assertTrue("Renamed nested file1 exists", + fs.exists(path("/test/new/newdir/file1"))); + assertTrue("Renamed nested exists", + fs.exists(path("/test/new/newdir/subdir/file2"))); + } + +// @Override + public void testMoveDirUnderParent() throws Throwable { + // not support because + // Fails if dst is a directory that is not empty. + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java new file mode 100644 index 0000000..360a151 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.net.URI; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient; +import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest; +import com.amazonaws.services.securitytoken.model.GetSessionTokenResult; +import com.amazonaws.services.securitytoken.model.Credentials; + +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.apache.hadoop.fs.s3native.S3xLoginHelper; +import org.apache.hadoop.conf.Configuration; + +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.Constants.*; + +/** + * Tests use of temporary credentials (for example, AWS STS & S3). + * This test extends a class that "does things to the root directory", and + * should only be used against transient filesystems where you don't care about + * the data. + */ +public class ITestS3ATemporaryCredentials extends AbstractFSContractTestBase { + public static final String TEST_STS_ENABLED = "test.fs.s3a.sts.enabled"; + public static final String TEST_STS_ENDPOINT = "test.fs.s3a.sts.endpoint"; + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3ATemporaryCredentials.class); + + private static final String PROVIDER_CLASS + = TemporaryAWSCredentialsProvider.NAME; + + private static final long TEST_FILE_SIZE = 1024; + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + /** + * Test use of STS for requesting temporary credentials. + * + * The property test.sts.endpoint can be set to point this at different + * STS endpoints. This test will use the AWS credentials (if provided) for + * S3A tests to request temporary credentials, then attempt to use those + * credentials instead. + * + * @throws IOException + */ + @Test + public void testSTS() throws IOException { + Configuration conf = getContract().getConf(); + if (!conf.getBoolean(TEST_STS_ENABLED, true)) { + skip("STS functional tests disabled"); + } + + S3xLoginHelper.Login login = S3AUtils.getAWSAccessKeys( + URI.create("s3a://foobar"), conf); + if (!login.hasLogin()) { + skip("testSTS disabled because AWS credentials not configured"); + } + AWSCredentialsProvider parentCredentials = new BasicAWSCredentialsProvider( + login.getUser(), login.getPassword()); + + String stsEndpoint = conf.getTrimmed(TEST_STS_ENDPOINT, ""); + AWSSecurityTokenServiceClient stsClient; + stsClient = new AWSSecurityTokenServiceClient(parentCredentials); + if (!stsEndpoint.isEmpty()) { + LOG.debug("STS Endpoint ={}", stsEndpoint); + stsClient.setEndpoint(stsEndpoint); + } + GetSessionTokenRequest sessionTokenRequest = new GetSessionTokenRequest(); + sessionTokenRequest.setDurationSeconds(900); + GetSessionTokenResult sessionTokenResult; + sessionTokenResult = stsClient.getSessionToken(sessionTokenRequest); + Credentials sessionCreds = sessionTokenResult.getCredentials(); + + String childAccessKey = sessionCreds.getAccessKeyId(); + conf.set(ACCESS_KEY, childAccessKey); + String childSecretKey = sessionCreds.getSecretAccessKey(); + conf.set(SECRET_KEY, childSecretKey); + String sessionToken = sessionCreds.getSessionToken(); + conf.set(SESSION_TOKEN, sessionToken); + + conf.set(AWS_CREDENTIALS_PROVIDER, PROVIDER_CLASS); + + try(S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + createAndVerifyFile(fs, path("testSTS"), TEST_FILE_SIZE); + } + + // now create an invalid set of credentials by changing the session + // token + conf.set(SESSION_TOKEN, "invalid-" + sessionToken); + try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + createAndVerifyFile(fs, path("testSTSInvalidToken"), TEST_FILE_SIZE); + fail("Expected an access exception, but file access to " + + fs.getUri() + " was allowed: " + fs); + } catch (AWSS3IOException ex) { + LOG.info("Expected Exception: {}", ex.toString()); + LOG.debug("Expected Exception: {}", ex, ex); + } + } + + @Test + public void testTemporaryCredentialValidation() throws Throwable { + Configuration conf = new Configuration(); + conf.set(ACCESS_KEY, "accesskey"); + conf.set(SECRET_KEY, "secretkey"); + conf.set(SESSION_TOKEN, ""); + TemporaryAWSCredentialsProvider provider + = new TemporaryAWSCredentialsProvider(getFileSystem().getUri(), conf); + try { + AWSCredentials credentials = provider.getCredentials(); + fail("Expected a CredentialInitializationException," + + " got " + credentials); + } catch (CredentialInitializationException expected) { + // expected + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java deleted file mode 100644 index 25a8958..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.util.StopWatch; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -/** - * Basic unit test for S3A's blocking executor service. - */ -public class TestBlockingThreadPoolExecutorService { - - private static final Logger LOG = LoggerFactory.getLogger( - BlockingThreadPoolExecutorService.class); - - private static final int NUM_ACTIVE_TASKS = 4; - private static final int NUM_WAITING_TASKS = 2; - private static final int TASK_SLEEP_MSEC = 100; - private static final int SHUTDOWN_WAIT_MSEC = 200; - private static final int SHUTDOWN_WAIT_TRIES = 5; - private static final int BLOCKING_THRESHOLD_MSEC = 50; - - private static final Integer SOME_VALUE = 1337; - - private static BlockingThreadPoolExecutorService tpe = null; - - @AfterClass - public static void afterClass() throws Exception { - ensureDestroyed(); - } - - /** - * Basic test of running one trivial task. - */ - @Test - public void testSubmitCallable() throws Exception { - ensureCreated(); - ListenableFuture f = tpe.submit(callableSleeper); - Integer v = f.get(); - assertEquals(SOME_VALUE, v); - } - - /** - * More involved test, including detecting blocking when at capacity. - */ - @Test - public void testSubmitRunnable() throws Exception { - ensureCreated(); - int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS; - StopWatch stopWatch = new StopWatch().start(); - for (int i = 0; i < totalTasks; i++) { - tpe.submit(sleeper); - assertDidntBlock(stopWatch); - } - tpe.submit(sleeper); - assertDidBlock(stopWatch); - } - - @Test - public void testShutdown() throws Exception { - // Cover create / destroy, regardless of when this test case runs - ensureCreated(); - ensureDestroyed(); - - // Cover create, execute, destroy, regardless of when test case runs - ensureCreated(); - testSubmitRunnable(); - ensureDestroyed(); - } - - // Helper functions, etc. - - private void assertDidntBlock(StopWatch sw) { - try { - assertFalse("Non-blocking call took too long.", - sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC); - } finally { - sw.reset().start(); - } - } - - private void assertDidBlock(StopWatch sw) { - try { - if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) { - throw new RuntimeException("Blocking call returned too fast."); - } - } finally { - sw.reset().start(); - } - } - - private Runnable sleeper = new Runnable() { - @Override - public void run() { - String name = Thread.currentThread().getName(); - try { - Thread.sleep(TASK_SLEEP_MSEC); - } catch (InterruptedException e) { - LOG.info("Thread {} interrupted.", name); - Thread.currentThread().interrupt(); - } - } - }; - - private Callable callableSleeper = new Callable() { - @Override - public Integer call() throws Exception { - sleeper.run(); - return SOME_VALUE; - } - }; - - /** - * Helper function to create thread pool under test. - */ - private static void ensureCreated() throws Exception { - if (tpe == null) { - LOG.debug("Creating thread pool"); - tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS, - NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest"); - } - } - - /** - * Helper function to terminate thread pool under test, asserting that - * shutdown -> terminate works as expected. - */ - private static void ensureDestroyed() throws Exception { - if (tpe == null) { - return; - } - int shutdownTries = SHUTDOWN_WAIT_TRIES; - - tpe.shutdown(); - if (!tpe.isShutdown()) { - throw new RuntimeException("Shutdown had no effect."); - } - - while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC, - TimeUnit.MILLISECONDS)) { - LOG.info("Waiting for thread pool shutdown."); - if (shutdownTries-- <= 0) { - LOG.error("Failed to terminate thread pool gracefully."); - break; - } - } - if (!tpe.isTerminated()) { - tpe.shutdownNow(); - if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC, - TimeUnit.MILLISECONDS)) { - throw new RuntimeException( - "Failed to terminate thread pool in timely manner."); - } - } - tpe = null; - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org