Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F290D200CD4 for ; Sat, 15 Jul 2017 07:23:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F104716D9C1; Sat, 15 Jul 2017 05:23:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 730B916D9B7 for ; Sat, 15 Jul 2017 07:23:17 +0200 (CEST) Received: (qmail 35659 invoked by uid 500); 15 Jul 2017 05:23:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 35650 invoked by uid 99); 15 Jul 2017 05:23:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Jul 2017 05:23:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1AC05E04AA; Sat, 15 Jul 2017 05:23:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xiao@apache.org To: common-commits@hadoop.apache.org Message-Id: <7be8786bae1a4621a32e39d744661ee5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HADOOP-14521. KMS client needs retry logic. Contributed by Rushabh S Shah. Date: Sat, 15 Jul 2017 05:23:16 +0000 (UTC) archived-at: Sat, 15 Jul 2017 05:23:19 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2.8 e28c74102 -> 8b7d1df76 HADOOP-14521. KMS client needs retry logic. Contributed by Rushabh S Shah. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b7d1df7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b7d1df7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b7d1df7 Branch: refs/heads/branch-2.8 Commit: 8b7d1df766002ae033d5fca8017cfe7141803c58 Parents: e28c741 Author: Xiao Chen Authored: Fri Jul 14 22:22:40 2017 -0700 Committer: Xiao Chen Committed: Fri Jul 14 22:23:13 2017 -0700 ---------------------------------------------------------------------- .../crypto/key/kms/KMSClientProvider.java | 39 ++- .../key/kms/LoadBalancingKMSClientProvider.java | 78 ++++- .../fs/CommonConfigurationKeysPublic.java | 29 ++ .../src/main/resources/core-default.xml | 28 ++ .../kms/TestLoadBalancingKMSClientProvider.java | 315 ++++++++++++++++++- .../hadoop/hdfs/TestEncryptionZonesWithKMS.java | 19 +- 6 files changed, 465 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b7d1df7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index 3ce7f1db..39d467f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -305,9 +305,8 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, * - HOSTNAME = string * - PORT = integer * - * If multiple hosts are provider, the Factory will create a - * {@link LoadBalancingKMSClientProvider} that round-robins requests - * across the provided list of hosts. + * This will always create a {@link LoadBalancingKMSClientProvider} + * if the uri is correct. */ @Override public KeyProvider createProvider(URI providerUri, Configuration conf) @@ -334,30 +333,26 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } hostsPart = t[0]; } - return createProvider(providerUri, conf, origUrl, port, hostsPart); + return createProvider(conf, origUrl, port, hostsPart); } return null; } - private KeyProvider createProvider(URI providerUri, Configuration conf, + private KeyProvider createProvider(Configuration conf, URL origUrl, int port, String hostsPart) throws IOException { String[] hosts = hostsPart.split(";"); - if (hosts.length == 1) { - return new KMSClientProvider(providerUri, conf); - } else { - KMSClientProvider[] providers = new KMSClientProvider[hosts.length]; - for (int i = 0; i < hosts.length; i++) { - try { - providers[i] = - new KMSClientProvider( - new URI("kms", origUrl.getProtocol(), hosts[i], port, - origUrl.getPath(), null, null), conf); - } catch (URISyntaxException e) { - throw new IOException("Could not instantiate KMSProvider..", e); - } + KMSClientProvider[] providers = new KMSClientProvider[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + try { + providers[i] = + new KMSClientProvider( + new URI("kms", origUrl.getProtocol(), hosts[i], port, + origUrl.getPath(), null, null), conf); + } catch (URISyntaxException e) { + throw new IOException("Could not instantiate KMSProvider.", e); } - return new LoadBalancingKMSClientProvider(providers, conf); } + return new LoadBalancingKMSClientProvider(providers, conf); } } @@ -1024,7 +1019,11 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { - throw new IOException(e); + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException(e); + } } } return tokens; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b7d1df7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java index e17b507..44d5a1d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java @@ -19,6 +19,7 @@ package org.apache.hadoop.crypto.key.kms; import java.io.IOException; +import java.io.InterruptedIOException; import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; import java.util.Arrays; @@ -31,6 +32,11 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Time; @@ -68,6 +74,8 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements private final KMSClientProvider[] providers; private final AtomicInteger currentIdx; + private RetryPolicy retryPolicy = null; + public LoadBalancingKMSClientProvider(KMSClientProvider[] providers, Configuration conf) { this(shuffle(providers), Time.monotonicNow(), conf); @@ -79,24 +87,80 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements super(conf); this.providers = providers; this.currentIdx = new AtomicInteger((int)(seed % providers.length)); + int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic. + KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length); + int sleepBaseMillis = conf.getInt(CommonConfigurationKeysPublic. + KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, + CommonConfigurationKeysPublic. + KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT); + int sleepMaxMillis = conf.getInt(CommonConfigurationKeysPublic. + KMS_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY, + CommonConfigurationKeysPublic. + KMS_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT); + Preconditions.checkState(maxNumRetries >= 0); + Preconditions.checkState(sleepBaseMillis >= 0); + Preconditions.checkState(sleepMaxMillis >= 0); + this.retryPolicy = RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis, + sleepMaxMillis); } @VisibleForTesting - KMSClientProvider[] getProviders() { + public KMSClientProvider[] getProviders() { return providers; } private T doOp(ProviderCallable op, int currPos) throws IOException { + + if (providers.length == 0) { + throw new IOException("No providers configured !"); + } IOException ex = null; - for (int i = 0; i < providers.length; i++) { + int numFailovers = 0; + for (int i = 0;; i++, numFailovers++) { KMSClientProvider provider = providers[(currPos + i) % providers.length]; try { return op.call(provider); + } catch (AccessControlException ace) { + // No need to retry on AccessControlException + // and AuthorizationException. + // This assumes all the servers are configured with identical + // permissions and identical key acls. + throw ace; } catch (IOException ioe) { - LOG.warn("KMS provider at [{}] threw an IOException [{}]!!", - provider.getKMSUrl(), ioe.getMessage()); + LOG.warn("KMS provider at [{}] threw an IOException: ", + provider.getKMSUrl(), ioe); ex = ioe; + + RetryAction action = null; + try { + action = retryPolicy.shouldRetry(ioe, 0, numFailovers, false); + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException)e; + } + throw new IOException(e); + } + if (action.action == RetryAction.RetryDecision.FAIL) { + LOG.warn("Aborting since the Request has failed with all KMS" + + " providers(depending on {}={} setting and numProviders={})" + + " in the group OR the exception is not recoverable", + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, + getConf().getInt( + CommonConfigurationKeysPublic. + KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length), + providers.length); + throw ex; + } + if (((numFailovers + 1) % providers.length) == 0) { + // Sleep only after we try all the providers for every cycle. + try { + Thread.sleep(action.delayMillis); + } catch (InterruptedException e) { + throw new InterruptedIOException("Thread Interrupted"); + } + } } catch (Exception e) { if (e instanceof RuntimeException) { throw (RuntimeException)e; @@ -105,12 +169,6 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements } } } - if (ex != null) { - LOG.warn("Aborting since the Request has failed with all KMS" - + " providers in the group. !!"); - throw ex; - } - throw new IOException("No providers configured !!"); } private int nextIdx() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b7d1df7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index dfc34f2..e127350 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -690,6 +690,35 @@ public class CommonConfigurationKeysPublic { * * core-default.xml */ + /** Default value is the number of providers specified. */ + public static final String KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY = + "hadoop.security.kms.client.failover.max.retries"; + + /** + * @see + * + * core-default.xml + */ + public static final String KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY = + "hadoop.security.kms.client.failover.sleep.base.millis"; + /** Default value is 100 ms. */ + public static final int KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT = 100; + + /** + * @see + * + * core-default.xml + */ + public static final String KMS_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY = + "hadoop.security.kms.client.failover.sleep.max.millis"; + /** Default value is 2 secs. */ + public static final int KMS_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT = 2000; + + /** + * @see + * + * core-default.xml + */ public static final String HADOOP_SECURITY_JAVA_SECURE_RANDOM_ALGORITHM_KEY = "hadoop.security.java.secure.random.algorithm"; /** Defalt value for HADOOP_SECURITY_JAVA_SECURE_RANDOM_ALGORITHM_KEY */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b7d1df7/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index f319205..b83a328 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2130,6 +2130,34 @@ + + hadoop.security.kms.client.failover.sleep.base.millis + 100 + + Expert only. The time to wait, in milliseconds, between failover + attempts increases exponentially as a function of the number of + attempts made so far, with a random factor of +/- 50%. This option + specifies the base value used in the failover calculation. The + first failover will retry immediately. The 2nd failover attempt + will delay at least hadoop.security.client.failover.sleep.base.millis + milliseconds. And so on. + + + + + hadoop.security.kms.client.failover.sleep.max.millis + 2000 + + Expert only. The time to wait, in milliseconds, between failover + attempts increases exponentially as a function of the number of + attempts made so far, with a random factor of +/- 50%. This option + specifies the maximum value to wait between failovers. + Specifically, the time between two failover attempts will not + exceed +/- 50% of hadoop.security.client.failover.sleep.max.millis + milliseconds. + + + ipc.server.max.connections 0 http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b7d1df7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java index 8a2e87f..1b5a66f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java @@ -23,9 +23,12 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import java.io.IOException; +import java.net.NoRouteToHostException; import java.net.URI; +import java.net.UnknownHostException; import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; @@ -33,6 +36,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider.Options; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.net.ConnectTimeoutException; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authorize.AuthorizationException; import org.junit.Test; @@ -47,14 +53,17 @@ public class TestLoadBalancingKMSClientProvider { Configuration conf = new Configuration(); KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI( "kms://http@host1/kms/foo"), conf); - assertTrue(kp instanceof KMSClientProvider); - assertEquals("http://host1/kms/foo/v1/", - ((KMSClientProvider) kp).getKMSUrl()); + assertTrue(kp instanceof LoadBalancingKMSClientProvider); + KMSClientProvider[] providers = + ((LoadBalancingKMSClientProvider) kp).getProviders(); + assertEquals(1, providers.length); + assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"), + Sets.newHashSet(providers[0].getKMSUrl())); kp = new KMSClientProvider.Factory().createProvider(new URI( "kms://http@host1;host2;host3/kms/foo"), conf); assertTrue(kp instanceof LoadBalancingKMSClientProvider); - KMSClientProvider[] providers = + providers = ((LoadBalancingKMSClientProvider) kp).getProviders(); assertEquals(3, providers.length); assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/", @@ -122,7 +131,7 @@ public class TestLoadBalancingKMSClientProvider { // This should be retried KMSClientProvider p4 = mock(KMSClientProvider.class); when(p4.createKey(Mockito.anyString(), Mockito.any(Options.class))) - .thenThrow(new IOException("p4")); + .thenThrow(new ConnectTimeoutException("p4")); when(p4.getKMSUrl()).thenReturn("p4"); KeyProvider kp = new LoadBalancingKMSClientProvider( new KMSClientProvider[] { p1, p2, p3, p4 }, 0, conf); @@ -320,4 +329,298 @@ public class TestLoadBalancingKMSClientProvider { Mockito.verify(p1, Mockito.times(1)).warmUpEncryptedKeys(keyName); Mockito.verify(p2, Mockito.times(1)).warmUpEncryptedKeys(keyName); } -} + + /** + * Tests whether retryPolicy fails immediately on encountering IOException + * which is not SocketException. + * @throws Exception + */ + @Test + public void testClientRetriesWithIOException() throws Exception { + Configuration conf = new Configuration(); + // Setting total failover attempts to . + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.getMetadata(Mockito.anyString())) + .thenThrow(new IOException("p1")); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.getMetadata(Mockito.anyString())) + .thenThrow(new IOException("p2")); + KMSClientProvider p3 = mock(KMSClientProvider.class); + when(p3.getMetadata(Mockito.anyString())) + .thenThrow(new IOException("p3")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + when(p3.getKMSUrl()).thenReturn("p3"); + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2, p3}, 0, conf); + try { + kp.getMetadata("test3"); + fail("Should fail since all providers threw an IOException"); + } catch (Exception e) { + assertTrue(e instanceof IOException); + } + verify(kp.getProviders()[0], Mockito.times(1)) + .getMetadata(Mockito.eq("test3")); + verify(kp.getProviders()[1], Mockito.never()) + .getMetadata(Mockito.eq("test3")); + verify(kp.getProviders()[2], Mockito.never()) + .getMetadata(Mockito.eq("test3")); + } + + /** + * Tests that client doesn't retry once it encounters AccessControlException + * from first provider. + * This assumes all the kms servers are configured with identical access to + * keys. + * @throws Exception + */ + @Test + public void testClientRetriesWithAccessControlException() throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 3); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new AccessControlException("p1")); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new IOException("p2")); + KMSClientProvider p3 = mock(KMSClientProvider.class); + when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new IOException("p3")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + when(p3.getKMSUrl()).thenReturn("p3"); + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2, p3}, 0, conf); + try { + kp.createKey("test3", new Options(conf)); + fail("Should fail because provider p1 threw an AccessControlException"); + } catch (Exception e) { + assertTrue(e instanceof AccessControlException); + } + verify(p1, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p2, Mockito.never()).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p3, Mockito.never()).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + } + + /** + * Tests that client doesn't retry once it encounters RunTimeException + * from first provider. + * This assumes all the kms servers are configured with identical access to + * keys. + * @throws Exception + */ + @Test + public void testClientRetriesWithRuntimeException() throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 3); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new RuntimeException("p1")); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new IOException("p2")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2}, 0, conf); + try { + kp.createKey("test3", new Options(conf)); + fail("Should fail since provider p1 threw RuntimeException"); + } catch (Exception e) { + assertTrue(e instanceof RuntimeException); + } + verify(p1, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p2, Mockito.never()).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + } + + /** + * Tests the client retries until it finds a good provider. + * @throws Exception + */ + @Test + public void testClientRetriesWithTimeoutsException() throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 4); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new ConnectTimeoutException("p1")); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new UnknownHostException("p2")); + KMSClientProvider p3 = mock(KMSClientProvider.class); + when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new NoRouteToHostException("p3")); + KMSClientProvider p4 = mock(KMSClientProvider.class); + when(p4.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenReturn( + new KMSClientProvider.KMSKeyVersion("test3", "v1", new byte[0])); + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + when(p3.getKMSUrl()).thenReturn("p3"); + when(p4.getKMSUrl()).thenReturn("p4"); + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2, p3, p4}, 0, conf); + try { + kp.createKey("test3", new Options(conf)); + } catch (Exception e) { + fail("Provider p4 should have answered the request."); + } + verify(p1, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p2, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p3, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p4, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + } + + /** + * Tests the operation succeeds second time after ConnectTimeoutException. + * @throws Exception + */ + @Test + public void testClientRetriesSucceedsSecondTime() throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 3); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new ConnectTimeoutException("p1")) + .thenReturn(new KMSClientProvider.KMSKeyVersion("test3", "v1", + new byte[0])); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new ConnectTimeoutException("p2")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2}, 0, conf); + try { + kp.createKey("test3", new Options(conf)); + } catch (Exception e) { + fail("Provider p1 should have answered the request second time."); + } + verify(p1, Mockito.times(2)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p2, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + } + + /** + * Tests whether retryPolicy retries specified number of times. + * @throws Exception + */ + @Test + public void testClientRetriesSpecifiedNumberOfTimes() throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new ConnectTimeoutException("p1")); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new ConnectTimeoutException("p2")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2}, 0, conf); + try { + kp.createKey("test3", new Options(conf)); + fail("Should fail"); + } catch (Exception e) { + assert (e instanceof ConnectTimeoutException); + } + verify(p1, Mockito.times(6)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p2, Mockito.times(5)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + } + + /** + * Tests whether retryPolicy retries number of times equals to number of + * providers if conf kms.client.failover.max.attempts is not set. + * @throws Exception + */ + @Test + public void testClientRetriesIfMaxAttemptsNotSet() throws Exception { + Configuration conf = new Configuration(); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new ConnectTimeoutException("p1")); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new ConnectTimeoutException("p2")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2}, 0, conf); + try { + kp.createKey("test3", new Options(conf)); + fail("Should fail"); + } catch (Exception e) { + assert (e instanceof ConnectTimeoutException); + } + verify(p1, Mockito.times(2)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p2, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + } + + /** + * Tests that client doesn't retry once it encounters AuthenticationException + * wrapped in an IOException from first provider. + * @throws Exception + */ + @Test + public void testClientRetriesWithAuthenticationExceptionWrappedinIOException() + throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 3); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new IOException(new AuthenticationException("p1"))); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new ConnectTimeoutException("p2")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2}, 0, conf); + try { + kp.createKey("test3", new Options(conf)); + fail("Should fail since provider p1 threw AuthenticationException"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof AuthenticationException); + } + verify(p1, Mockito.times(1)).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + verify(p2, Mockito.never()).createKey(Mockito.eq("test3"), + Mockito.any(Options.class)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b7d1df7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java index 959e724..6f53362 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import com.google.common.base.Supplier; import org.apache.hadoop.crypto.key.kms.KMSClientProvider; +import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider; import org.apache.hadoop.crypto.key.kms.server.MiniKMS; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -69,14 +70,21 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones { protected void setProvider() { } + private KMSClientProvider getKMSClientProvider() { + LoadBalancingKMSClientProvider lbkmscp = + (LoadBalancingKMSClientProvider) Whitebox + .getInternalState(cluster.getNamesystem().getProvider(), "extension"); + assert lbkmscp.getProviders().length == 1; + return lbkmscp.getProviders()[0]; + } + @Test(timeout = 120000) public void testCreateEZPopulatesEDEKCache() throws Exception { final Path zonePath = new Path("/TestEncryptionZone"); fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH); @SuppressWarnings("unchecked") - KMSClientProvider kcp = (KMSClientProvider) Whitebox - .getInternalState(cluster.getNamesystem().getProvider(), "extension"); + KMSClientProvider kcp = getKMSClientProvider(); assertTrue(kcp.getEncKeyQueueSize(TEST_KEY) > 0); } @@ -110,8 +118,7 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones { dfsAdmin.createEncryptionZone(zonePath, anotherKey, NO_TRASH); @SuppressWarnings("unchecked") - KMSClientProvider spy = (KMSClientProvider) Whitebox - .getInternalState(cluster.getNamesystem().getProvider(), "extension"); + KMSClientProvider spy = getKMSClientProvider(); assertTrue("key queue is empty after creating encryption zone", spy.getEncKeyQueueSize(TEST_KEY) > 0); @@ -122,9 +129,7 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - final KMSClientProvider kspy = (KMSClientProvider) Whitebox - .getInternalState(cluster.getNamesystem().getProvider(), - "extension"); + final KMSClientProvider kspy = getKMSClientProvider(); return kspy.getEncKeyQueueSize(TEST_KEY) > 0; } }, 1000, 60000); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org