From jira-return-9882-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon Feb 12 23:30:10 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 703A5180652 for ; Mon, 12 Feb 2018 23:30:10 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5FD0A160C3F; Mon, 12 Feb 2018 22:30:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3431B160C30 for ; Mon, 12 Feb 2018 23:30:09 +0100 (CET) Received: (qmail 13349 invoked by uid 500); 12 Feb 2018 22:30:08 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 13338 invoked by uid 99); 12 Feb 2018 22:30:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Feb 2018 22:30:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id EED66C004A for ; Mon, 12 Feb 2018 22:30:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.511 X-Spam-Level: X-Spam-Status: No, score=-109.511 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ca6mANMRzPA5 for ; Mon, 12 Feb 2018 22:30:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id E9E835FB02 for ; Mon, 12 Feb 2018 22:30:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id F0122E033A for ; Mon, 12 Feb 2018 22:30:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 869BB2411E for ; Mon, 12 Feb 2018 22:30:00 +0000 (UTC) Date: Mon, 12 Feb 2018 22:30:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-5944) Add unit tests for handling of authentication failures in clients MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/KAFKA-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361546#comment-16361546 ] ASF GitHub Bot commented on KAFKA-5944: --------------------------------------- hachikuji closed pull request #3965: KAFKA-5944: Unit tests for handling SASL authentication failures in clients URL: https://github.com/apache/kafka/pull/3965 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index d843414fd7a..65255fe9648 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.utils.Time; @@ -77,6 +78,7 @@ public FutureResponse(Node node, private Node node = null; private final Set ready = new HashSet<>(); private final Map blackedOut = new HashMap<>(); + private final Map authenticationException = new HashMap<>(); // Use concurrent queue for requests so that requests may be queried from a different thread private final Queue requests = new ConcurrentLinkedDeque<>(); // Use concurrent queue for responses so that responses may be updated during poll() from a different thread. @@ -102,7 +104,7 @@ public boolean isReady(Node node, long now) { @Override public boolean ready(Node node, long now) { - if (isBlackedOut(node)) + if (isBlackedOut(node) || authenticationException(node) != null) return false; ready.add(node.idString()); return true; @@ -117,6 +119,12 @@ public void blackout(Node node, long duration) { blackedOut.put(node, time.milliseconds() + duration); } + public void authenticationFailed(Node node, long duration) { + authenticationException.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception()); + disconnect(node.idString()); + blackout(node, duration); + } + private boolean isBlackedOut(Node node) { if (blackedOut.containsKey(node)) { long expiration = blackedOut.get(node); @@ -137,7 +145,7 @@ public boolean connectionFailed(Node node) { @Override public AuthenticationException authenticationException(Node node) { - return null; + return authenticationException.get(node); } @Override @@ -347,6 +355,7 @@ public void reset() { responses.clear(); futureResponses.clear(); metadataUpdates.clear(); + authenticationException.clear(); } public void prepareMetadataUpdate(Cluster cluster, Set unavailableTopics) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 186ccf06cb5..f08a99b6ddc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; @@ -248,6 +249,75 @@ public void testInvalidTopicNames() throws Exception { } } + @Test + public void testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws Exception { + AdminClientUnitTestEnv env = mockClientEnv(); + Node node = env.cluster().controller(); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setNode(node); + env.kafkaClient().authenticationFailed(node, 300); + + callAdminClientApisAndExpectAnAuthenticationError(env); + + // wait less than the blackout period, the connection should fail and the authentication error should remain + env.time().sleep(30); + assertTrue(env.kafkaClient().connectionFailed(node)); + callAdminClientApisAndExpectAnAuthenticationError(env); + + env.close(); + } + + private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException { + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + + try { + env.adminClient().createTopics( + Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))), + new CreateTopicsOptions().timeoutMs(10000)).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + Map counts = new HashMap<>(); + counts.put("my_topic", NewPartitions.increaseTo(3)); + counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3)))); + env.adminClient().createPartitions(counts).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + env.adminClient().createAcls(asList(ACL1, ACL2)).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + env.adminClient().describeAcls(FILTER1).values().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + env.adminClient().describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + } + private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)); private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"), @@ -579,7 +649,7 @@ public static KafkaAdminClient createInternal(AdminClientConfig config, KafkaAdm private int numTries = 0; private int failuresInjected = 0; - + @Override public KafkaAdminClient.TimeoutProcessor create(long now) { return new FailureInjectingTimeoutProcessor(now); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index d47124f8f3e..be8db2bf55c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; @@ -1431,6 +1432,88 @@ public boolean conditionMet() { } } + @Test + public void testConsumerWithinBlackoutPeriodAfterAuthenticationFailure() { + int rebalanceTimeoutMs = 60000; + int sessionTimeoutMs = 30000; + int heartbeatIntervalMs = 3000; + int autoCommitIntervalMs = 1000; + + Time time = new MockTime(); + Map tpCounts = new HashMap<>(); + tpCounts.put(topic, 1); + Cluster cluster = TestUtils.singletonCluster(tpCounts); + Node node = cluster.nodes().get(0); + + Metadata metadata = createMetadata(); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + client.authenticationFailed(node, 300); + PartitionAssignor assignor = new RangeAssignor(); + + final KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); + + consumer.subscribe(Collections.singleton(topic)); + callConsumerApisAndExpectAnAuthenticationError(consumer, tp0); + + time.sleep(30); // wait less than the blackout period + assertTrue(client.connectionFailed(node)); + callConsumerApisAndExpectAnAuthenticationError(consumer, tp0); + + client.requests().clear(); + consumer.close(0, TimeUnit.MILLISECONDS); + } + + private void callConsumerApisAndExpectAnAuthenticationError(KafkaConsumer consumer, TopicPartition partition) { + try { + consumer.partitionsFor("some other topic"); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + try { + consumer.beginningOffsets(Collections.singleton(partition)); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + try { + consumer.endOffsets(Collections.singleton(partition)); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + try { + consumer.poll(10); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + Map offset = new HashMap<>(); + offset.put(partition, new OffsetAndMetadata(10L)); + + try { + consumer.commitSync(offset); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + try { + consumer.committed(partition); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + } + private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer consumer) { return new ConsumerRebalanceListener() { @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 7eaca9826d9..1c88803e26c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -519,6 +520,30 @@ public void testWakeupInOnJoinComplete() throws Exception { awaitFirstHeartbeat(heartbeatReceived); } + @Test + public void testEnsureCoordinatorReadyWithinBlackoutPeriodAfterAuthenticationFailure() { + setupCoordinator(RETRY_BACKOFF_MS); + + mockClient.authenticationFailed(node, 300); + + try { + coordinator.ensureCoordinatorReady(); + fail("Expected an authentication error."); + } catch (AuthenticationException e) { + // OK + } + + mockTime.sleep(30); // wait less than the blackout period + assertTrue(mockClient.connectionFailed(node)); + + try { + coordinator.ensureCoordinatorReady(); + fail("Expected an authentication error."); + } catch (AuthenticationException e) { + // OK + } + } + private AtomicBoolean prepareFirstHeartbeat() { final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); mockClient.prepareResponse(new MockClient.RequestMatcher() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index c49339b6525..fdaa6b3f522 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.OffsetMetadataTooLarge; @@ -1503,6 +1504,28 @@ public void testRefreshOffsetWithNoFetchableOffsets() { assertEquals(null, subscriptions.committed(t1p)); } + @Test + public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure() { + client.authenticationFailed(node, 300); + + try { + coordinator.ensureActiveGroup(); + fail("Expected an authentication error."); + } catch (AuthenticationException e) { + // OK + } + + time.sleep(30); // wait less than the blackout period + assertTrue(client.connectionFailed(node)); + + try { + coordinator.ensureActiveGroup(); + fail("Expected an authentication error."); + } catch (AuthenticationException e) { + // OK + } + } + @Test public void testProtocolMetadataOrder() { RoundRobinAssignor roundRobin = new RoundRobinAssignor(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 93c6acd9c40..904270ec489 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; @@ -69,6 +70,24 @@ public void send() { assertEquals(Errors.NONE, response.error()); } + @Test + public void sendWithinBlackoutPeriodAfterAuthenticationFailure() throws InterruptedException { + client.authenticationFailed(node, 300); + client.prepareResponse(heartbeatResponse(Errors.NONE)); + final RequestFuture future = consumerClient.send(node, heartbeat()); + consumerClient.poll(future); + assertTrue(future.failed()); + assertTrue("Expected only an authentication error.", future.exception() instanceof AuthenticationException); + + time.sleep(30); // wait less than the blackout period + assertTrue(client.connectionFailed(node)); + + final RequestFuture future2 = consumerClient.send(node, heartbeat()); + consumerClient.poll(future2); + assertTrue(future2.failed()); + assertTrue("Expected only an authentication error.", future2.exception() instanceof AuthenticationException); + } + @Test public void multiSend() { client.prepareResponse(heartbeatResponse(Errors.NONE)); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > Add unit tests for handling of authentication failures in clients > ----------------------------------------------------------------- > > Key: KAFKA-5944 > URL: https://issues.apache.org/jira/browse/KAFKA-5944 > Project: Kafka > Issue Type: Test > Components: clients > Reporter: Rajini Sivaram > Assignee: Vahid Hashemian > Priority: Major > Fix For: 2.0.0 > > > KAFKA-5854 improves authentication failures in clients and has added integration tests and some basic client-side tests that create actual connections to a mock server. It will be good to add a set of tests for producers, consumers etc. that use MockClient to add more extensive tests for various scenarios. > cc [~hachikuji] [~vahid] -- This message was sent by Atlassian JIRA (v7.6.3#76005)