kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3618; Handle ApiVersionsRequest before SASL authentication
Date Fri, 29 Apr 2016 18:15:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e50327331 -> 69d9a669d


KAFKA-3618; Handle ApiVersionsRequest before SASL authentication

Server-side implementation and tests for handling ApiVersionsRequest before SaslHandshakeRequest.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Gwen Shapira, Ismael Juma

Closes #1286 from rajinisivaram/KAFKA-3618


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69d9a669
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69d9a669
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69d9a669

Branch: refs/heads/trunk
Commit: 69d9a669d7bbfec1e33dd6177c5687ef7f9977df
Parents: e503273
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Fri Apr 29 11:15:20 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Apr 29 11:15:20 2016 -0700

----------------------------------------------------------------------
 .../common/requests/ApiVersionsResponse.java    |  14 ++
 .../authenticator/SaslServerAuthenticator.java  |  10 ++
 .../kafka/common/network/NetworkTestUtils.java  |  14 +-
 .../authenticator/SaslAuthenticatorTest.java    | 159 ++++++++++++++++++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  12 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |  23 +--
 .../unit/kafka/server/ApiVersionsTest.scala     |   6 +-
 .../unit/kafka/server/BaseRequestTest.scala     |  24 +--
 .../server/SaslApiVersionsRequestTest.scala     |  78 +++++++++
 9 files changed, 297 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 36881a3..fe995b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -15,6 +15,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
@@ -29,6 +30,7 @@ import java.util.Map;
 public class ApiVersionsResponse extends AbstractRequestResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id);
+    private static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();
 
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String API_VERSIONS_KEY_NAME = "api_versions";
@@ -106,6 +108,18 @@ public class ApiVersionsResponse extends AbstractRequestResponse {
         return new ApiVersionsResponse(error.code(), Collections.<ApiVersion>emptyList());
     }
 
+    public static ApiVersionsResponse apiVersionsResponse() {
+        return API_VERSIONS_RESPONSE;
+    }
+
+    private static ApiVersionsResponse createApiVersionsResponse() {
+        List<ApiVersion> versionList = new ArrayList<>();
+        for (ApiKeys apiKey : ApiKeys.values()) {
+            versionList.add(new ApiVersion(apiKey.id, Protocol.MIN_VERSIONS[apiKey.id], Protocol.CURR_VERSION[apiKey.id]));
+        }
+        return new ApiVersionsResponse(Errors.NONE.code(), versionList);
+    }
+
     private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions)
{
         Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
         for (ApiVersion apiVersion: apiVersions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 89c6e6c..a9c19a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -61,6 +61,8 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.requests.ResponseSend;
@@ -290,7 +292,11 @@ public class SaslServerAuthenticator implements Authenticator {
             isKafkaRequest = true;
 
             ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+            LOG.debug("Handle Kafka request {}", apiKey);
             switch (apiKey) {
+                case API_VERSIONS:
+                    handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
+                    break;
                 case SASL_HANDSHAKE:
                     clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest)
request);
                     break;
@@ -336,6 +342,10 @@ public class SaslServerAuthenticator implements Authenticator {
         }
     }
 
+    private void handleApiVersionsRequest(RequestHeader requestHeader, ApiVersionsRequest
versionRequest) throws IOException, UnsupportedSaslMechanismException {
+        sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
+    }
+
     private void sendKafkaResponse(RequestHeader requestHeader, AbstractRequestResponse response)
throws IOException {
         ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId());
         netOutBuffer = new NetworkSend(node, ResponseSend.serialize(responseHeader, response.toStruct()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index 53ba954..969055d 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -42,13 +42,10 @@ public class NetworkTestUtils {
 
     public static void checkClientConnection(Selector selector, String node, int minMessageSize,
int messageCount) throws Exception {
 
+        waitForChannelReady(selector, node);
         String prefix = TestUtils.randomString(minMessageSize);
         int requests = 0;
         int responses = 0;
-        // wait for handshake to finish
-        while (!selector.isChannelReady(node)) {
-            selector.poll(1000L);
-        }
         selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes())));
         requests++;
         while (responses < messageCount) {
@@ -66,6 +63,15 @@ public class NetworkTestUtils {
         }
     }
 
+    public static void waitForChannelReady(Selector selector, String node) throws IOException
{
+        // wait for handshake to finish
+        int secondsLeft = 30;
+        while (!selector.isChannelReady(node) && secondsLeft-- > 0) {
+            selector.poll(1000L);
+        }
+        assertTrue(selector.isChannelReady(node));
+    }
+
     public static void waitForChannelClose(Selector selector, String node) throws IOException
{
         boolean closed = false;
         for (int i = 0; i < 30; i++) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 0a4928b..368b5a7 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -17,11 +17,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.CertStores;
@@ -34,11 +37,18 @@ import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -210,6 +220,30 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
+     * prior to SASL handshake flow and that subsequent authentication succeeds
+     * when transport layer is PLAINTEXT. This test simulates SASL authentication using a
+     * (non-SASL) PLAINTEXT client and sends ApiVersionsRequest straight after
+     * connection to the server is established, before any SASL-related packets are sent.
+     */
+    @Test
+    public void testUnauthenticatedApiVersionsRequestOverPlaintext() throws Exception {
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT);
+    }
+
+    /**
+     * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
+     * prior to SASL handshake flow and that subsequent authentication succeeds
+     * when transport layer is SSL. This test simulates SASL authentication using a
+     * (non-SASL) SSL client and sends ApiVersionsRequest straight after
+     * SSL handshake, before any SASL-related packets are sent.
+     */
+    @Test
+    public void testUnauthenticatedApiVersionsRequestOverSsl() throws Exception {
+        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL);
+    }
+
+    /**
      * Tests that any invalid data during Kafka SASL handshake request flow
      * or the actual SASL authentication flow result in authentication failure
      * and do not cause any failures in the server.
@@ -223,7 +257,7 @@ public class SaslAuthenticatorTest {
         // Send invalid SASL packet after valid handshake request
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        sendHandshakeRequest(node1);
+        sendHandshakeRequestReceiveResponse(node1);
         Random random = new Random();
         byte[] bytes = new byte[1024];
         random.nextBytes(bytes);
@@ -247,6 +281,33 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests that ApiVersionsRequest after Kafka SASL handshake request flow,
+     * but prior to actual SASL authentication, results in authentication failure.
+     * This is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * where a non-SASL client is used to send requests that are processed by
+     * {@link SaslServerAuthenticator} of the server prior to client authentication.
+     */
+    @Test
+    public void testInvalidApiVersionsRequestSequence() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+        // Send handshake request followed by ApiVersionsRequest
+        String node1 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+        sendHandshakeRequestReceiveResponse(node1);
+
+        RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, "someclient",
2);
+        selector.send(new NetworkSend(node1, RequestSend.serialize(versionsHeader, new ApiVersionsRequest().toStruct())));
+        NetworkTestUtils.waitForChannelClose(selector, node1);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good1");
+    }
+
+    /**
      * Tests that packets that are too big during Kafka SASL handshake request flow
      * or the actual SASL authentication flow result in authentication failure
      * and do not cause any failures in the server.
@@ -260,7 +321,7 @@ public class SaslAuthenticatorTest {
         // Send SASL packet with large size after valid handshake request
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        sendHandshakeRequest(node1);
+        sendHandshakeRequestReceiveResponse(node1);
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.putInt(Integer.MAX_VALUE);
         buffer.put(new byte[buffer.capacity() - 4]);
@@ -312,7 +373,7 @@ public class SaslAuthenticatorTest {
         // Send metadata request after Kafka SASL handshake request
         String node2 = "invalid2";
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
-        sendHandshakeRequest(node2);
+        sendHandshakeRequestReceiveResponse(node2);
         RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, "someclient",
2);
         MetadataRequest metadataRequest2 = new MetadataRequest(Collections.singletonList("sometopic"));
         selector.send(new NetworkSend(node2, RequestSend.serialize(metadataRequestHeader2,
metadataRequest2.toStruct())));
@@ -371,6 +432,68 @@ public class SaslAuthenticatorTest {
         NetworkTestUtils.waitForChannelClose(selector, node);
     }
 
+    /**
+     * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
+     * prior to SASL handshake flow and that subsequent authentication succeeds
+     * when transport layer is PLAINTEXT/SSL. This test uses a non-SASL client that simulates
+     * SASL authentication after ApiVersionsRequest.
+     * <p>
+     * Test sequence (using <tt>securityProtocol=PLAINTEXT</tt> as an example):
+     * <ol>
+     *   <li>Starts a SASL_PLAINTEXT test server that simply echoes back client requests
after authentication.</li>
+     *   <li>A (non-SASL) PLAINTEXT test client connects to the SASL server port. Client
is now unauthenticated.<./li>
+     *   <li>The unauthenticated non-SASL client sends an ApiVersionsRequest and validates
the response.
+     *       A valid response indicates that {@link SaslServerAuthenticator} of the test
server responded to
+     *       the ApiVersionsRequest even though the client is not yet authenticated.</li>
+     *   <li>The unauthenticated non-SASL client sends a SaslHandshakeRequest and validates
the response. A valid response
+     *       indicates that {@link SaslServerAuthenticator} of the test server responded
to the SaslHandshakeRequest
+     *       after processing ApiVersionsRequest.</li>
+     *   <li>The unauthenticated non-SASL client sends the SASL/PLAIN packet containing
username/password to authenticate
+     *       itself. The client is now authenticated by the server. At this point this test
client is at the
+     *       same state as a regular SASL_PLAINTEXT client that is <tt>ready</tt>.</li>
+     *   <li>The authenticated client sends random data to the server and checks that
the data is echoed
+     *       back by the test server (ie, not Kafka request-response) to ensure that the
client now
+     *       behaves exactly as a regular SASL_PLAINTEXT client that has completed authentication.</li>
+     * </ol>
+     */
+    private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol)
throws Exception {
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+        // Create non-SASL connection to manually authenticate after ApiVersionsRequest
+        String node = "1";
+        SecurityProtocol clientProtocol;
+        switch (securityProtocol) {
+            case SASL_PLAINTEXT:
+                clientProtocol = SecurityProtocol.PLAINTEXT;
+                break;
+            case SASL_SSL:
+                clientProtocol = SecurityProtocol.SSL;
+                break;
+            default:
+                throw new IllegalArgumentException("Server protocol " + securityProtocol
+ " is not SASL");
+        }
+        createClientConnection(clientProtocol, node);
+        NetworkTestUtils.waitForChannelReady(selector, node);
+
+        // Send ApiVersionsRequest and check response
+        ApiVersionsResponse versionsResponse = sendVersionRequestReceiveResponse(node);
+        assertEquals(Protocol.MIN_VERSIONS[ApiKeys.SASL_HANDSHAKE.id], versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion);
+        assertEquals(Protocol.CURR_VERSION[ApiKeys.SASL_HANDSHAKE.id], versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);
+
+        // Send SaslHandshakeRequest and check response
+        SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node);
+        assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms());
+
+        // Authenticate using PLAIN username/password
+        String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD;
+        selector.send(new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8"))));
+        waitForResponse();
+
+        // Check send/receive on the manually authenticated connection
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+    }
+
     private TestJaasConfig configureMechanisms(String clientMechanism, List<String>
serverMechanisms) {
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism);
         saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms);
@@ -396,13 +519,35 @@ public class SaslAuthenticatorTest {
         selector = null;
     }
 
-    private void sendHandshakeRequest(String node) throws Exception {
-        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, "someclient",
1);
+    private Struct sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequestResponse
request) throws IOException {
+        RequestHeader header = new RequestHeader(apiKey.id, "someclient", 1);
+        selector.send(new NetworkSend(node, RequestSend.serialize(header, request.toStruct())));
+        ByteBuffer responseBuffer = waitForResponse();
+        return NetworkClient.parseResponse(responseBuffer, header);
+    }
+
+    private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws
Exception {
         SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN");
-        selector.send(new NetworkSend(node, RequestSend.serialize(header, handshakeRequest.toStruct())));
+        Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE,
handshakeRequest);
+        SaslHandshakeResponse response = new SaslHandshakeResponse(responseStruct);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        return response;
+    }
+
+    private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception
{
+        ApiVersionsRequest handshakeRequest = new ApiVersionsRequest();
+        Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS,
handshakeRequest);
+        ApiVersionsResponse response = new ApiVersionsResponse(responseStruct);
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        return response;
+    }
+
+    private ByteBuffer waitForResponse() throws IOException {
         int waitSeconds = 10;
         do {
             selector.poll(1000);
-        } while (selector.completedSends().isEmpty() && waitSeconds-- > 0);
+        } while (selector.completedReceives().isEmpty() && waitSeconds-- > 0);
+        assertEquals(1, selector.completedReceives().size());
+        return selector.completedReceives().get(0).payload();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 67d46fc..cf7814e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -47,16 +47,6 @@ import scala.collection._
 import scala.collection.JavaConverters._
 import org.apache.kafka.common.requests.SaslHandshakeResponse
 
-object KafkaApis {
-  val apiVersionsResponse = new ApiVersionsResponse(Errors.NONE.code, buildApiKeysToApiVersions.values.toList.asJava)
-
-  private def buildApiKeysToApiVersions: Map[Short, ApiVersionsResponse.ApiVersion] = {
-    ApiKeys.values.map(apiKey =>
-      apiKey.id -> new ApiVersionsResponse.ApiVersion(apiKey.id, Protocol.MIN_VERSIONS(apiKey.id),
Protocol.CURR_VERSION(apiKey.id))).toMap
-  }
-}
-
-
 /**
  * Logic to handle the various Kafka requests
  */
@@ -1041,7 +1031,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id)
&&
                                               request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id)
     val responseBody = if (isApiVersionsRequestVersionSupported)
-      KafkaApis.apiVersionsResponse
+      ApiVersionsResponse.apiVersionsResponse
     else
       ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)
     requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId,
responseHeader, responseBody)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index ed59930..8bf4d73 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -25,16 +25,10 @@ import org.junit.Test
 
 import scala.collection.JavaConversions._
 
-class ApiVersionsRequestTest extends BaseRequestTest {
-
-  override def numBrokers: Int = 1
-
-  @Test
-  def testApiVersionsRequest() {
-    val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, 0)
-
+object ApiVersionsRequestTest {
+  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse) {
     assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.",
ApiKeys.values.length, apiVersionsResponse.apiVersions.size)
-    for (expectedApiVersion: ApiVersion <- KafkaApis.apiVersionsResponse.apiVersions)
{
+    for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.apiVersionsResponse.apiVersions)
{
       val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
       assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not
received in ApiVersionsResponse.", actualApiVersion)
       assertEquals("API key must be supported by the broker.", expectedApiVersion.apiKey,
actualApiVersion.apiKey)
@@ -42,6 +36,17 @@ class ApiVersionsRequestTest extends BaseRequestTest {
       assertEquals(s"Received unexpected max version for API key ${actualApiVersion.apiKey}.",
expectedApiVersion.maxVersion, actualApiVersion.maxVersion)
     }
   }
+}
+
+class ApiVersionsRequestTest extends BaseRequestTest {
+
+  override def numBrokers: Int = 1
+
+  @Test
+  def testApiVersionsRequest() {
+    val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, 0)
+    ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse)
+  }
 
   private def sendApiVersionsRequest(request: ApiVersionsRequest, version: Short): ApiVersionsResponse
= {
     val response = send(request, ApiKeys.API_VERSIONS, version)

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
index 4429f26..177b509 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -17,7 +17,7 @@
 
 package unit.kafka.server
 
-import kafka.server.KafkaApis
+import org.apache.kafka.common.requests.ApiVersionsResponse
 import org.apache.kafka.common.protocol.{Protocol, ApiKeys}
 import org.junit.Assert._
 import org.junit.Test
@@ -26,11 +26,11 @@ class ApiVersionsTest {
 
   @Test
   def testApiVersions {
-    val apiVersions = KafkaApis.apiVersionsResponse.apiVersions
+    val apiVersions = ApiVersionsResponse.apiVersionsResponse.apiVersions
     assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length)
 
     for (key <- ApiKeys.values) {
-      val version = KafkaApis.apiVersionsResponse.apiVersion(key.id)
+      val version = ApiVersionsResponse.apiVersionsResponse.apiVersion(key.id)
       assertNotNull(s"Could not find ApiVersion for API ${key.name}", version)
       assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, Protocol.MIN_VERSIONS(key.id))
       assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, Protocol.CURR_VERSION(key.id))

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index d92ccea..906c4b2 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -39,7 +39,9 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
   protected def propertyOverrides(properties: Properties) {}
 
   def generateConfigs() = {
-    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown
= false)
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown
= false,
+      interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = saslProperties)
     props.foreach(propertyOverrides)
     props.map(KafkaConfig.fromProps)
   }
@@ -57,7 +59,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
     }.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
   }
 
-  private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT):
Socket = {
+  def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT):
Socket = {
     new Socket("localhost", s.boundPort(protocol))
   }
 
@@ -76,20 +78,24 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
     response
   }
 
-  private def requestAndReceive(request: Array[Byte]): Array[Byte] = {
-    val plainSocket = connect()
+  def requestAndReceive(socket: Socket, request: Array[Byte]): Array[Byte] = {
+    sendRequest(socket, request)
+    receiveResponse(socket)
+  }
+
+  def send(request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer = {
+    val socket = connect()
     try {
-      sendRequest(plainSocket, request)
-      receiveResponse(plainSocket)
+      send(socket, request, apiKey, version)
     } finally {
-      plainSocket.close()
+      socket.close()
     }
   }
 
   /**
     * Serializes and send the request to the given api. A ByteBuffer containing the response
is returned.
     */
-  def send(request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer = {
+  def send(socket: Socket, request: AbstractRequest, apiKey: ApiKeys, version: Short): ByteBuffer
= {
     correlationId += 1
     val serializedBytes = {
       val header = new RequestHeader(apiKey.id, version, "", correlationId)
@@ -99,7 +105,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
       byteBuffer.array()
     }
 
-    val response = requestAndReceive(serializedBytes)
+    val response = requestAndReceive(socket, serializedBytes)
 
     val responseBuffer = ByteBuffer.wrap(response)
     ResponseHeader.parse(responseBuffer) // Parse the header to ensure its valid and move
the buffer forward

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d9a669/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
new file mode 100644
index 0000000..632665a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -0,0 +1,78 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.io.IOException
+import java.net.Socket
+import java.util.Collections
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
+import org.apache.kafka.common.requests.SaslHandshakeRequest
+import org.apache.kafka.common.requests.SaslHandshakeResponse
+import org.apache.kafka.common.protocol.Errors
+import org.junit.Test
+import org.junit.Assert._
+import kafka.api.SaslTestHarness
+
+class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  override protected val kafkaClientSaslMechanism = "PLAIN"
+  override protected val kafkaServerSaslMechanisms = List("PLAIN")
+  override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism,
Some(kafkaServerSaslMechanisms)))
+  override protected val zkSaslEnabled = false
+  override def numBrokers = 1
+
+  @Test
+  def testApiVersionsRequestBeforeSaslHandshakeRequest() {
+    val plaintextSocket = connect(protocol = securityProtocol)
+    try {
+      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest,
0)
+      ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse)
+      sendSaslHandshakeRequestValidateResponse(plaintextSocket)
+    } finally {
+      plaintextSocket.close()
+    }
+  }
+
+  @Test
+  def testApiVersionsRequestAfterSaslHandshakeRequest() {
+    val plaintextSocket = connect(protocol = securityProtocol)
+    try {
+      sendSaslHandshakeRequestValidateResponse(plaintextSocket)
+      try {
+        sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0)
+        fail("Versions Request during Sasl handshake did not fail")
+      } catch {
+        case ioe: IOException => // expected exception
+      }
+    } finally {
+      plaintextSocket.close()
+    }
+  }
+
+  private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest, version:
Short): ApiVersionsResponse = {
+    val response = send(socket, request, ApiKeys.API_VERSIONS, version)
+    ApiVersionsResponse.parse(response)
+  }
+
+  private def sendSaslHandshakeRequestValidateResponse(socket: Socket) {
+    val response = send(socket, new SaslHandshakeRequest("PLAIN"), ApiKeys.SASL_HANDSHAKE,
0)
+    val handshakeResponse = SaslHandshakeResponse.parse(response)
+    assertEquals(Errors.NONE.code, handshakeResponse.errorCode())
+    assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms())
+  }
+}


Mime
View raw message