kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3652; Return error response for unsupported version of ApiVersionsRequest
Date Thu, 05 May 2016 01:16:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 21351b575 -> 91130e424


KAFKA-3652; Return error response for unsupported version of ApiVersionsRequest

Handle unsupported version of ApiVersionsRequest during SASL auth as well as normal operation
by returning an error response.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #1310 from rajinisivaram/KAFKA-3652

(cherry picked from commit 64451af9e08de428064dc232cd6dea0ea0b2a81d)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/91130e42
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/91130e42
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/91130e42

Branch: refs/heads/0.10.0
Commit: 91130e4242f8000016a97a0e81a242ac41e5107c
Parents: 21351b5
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Wed May 4 18:16:08 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed May 4 18:16:19 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  |  6 +-
 .../authenticator/SaslServerAuthenticator.java  | 77 ++++++++++++--------
 .../authenticator/SaslAuthenticatorTest.java    | 62 ++++++++++++++++
 .../scala/kafka/network/RequestChannel.scala    | 13 +++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  4 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |  8 +-
 .../server/SaslApiVersionsRequestTest.scala     | 17 ++++-
 7 files changed, 147 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/91130e42/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 99cdbf9..326b780 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -848,6 +848,10 @@ public class Protocol {
         }
     }
 
+    public static boolean apiVersionSupported(short apiKey, short apiVersion) {
+        return apiKey < CURR_VERSION.length && apiVersion >= MIN_VERSIONS[apiKey]
&& apiVersion <= CURR_VERSION[apiKey];
+    }
+
     private static String indentString(int size) {
         StringBuilder b = new StringBuilder(size);
         for (int i = 0; i < size; i++)
@@ -1008,4 +1012,4 @@ public class Protocol {
         System.out.println(toHtml());
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/91130e42/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index a9c19a5..e1074a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkSend;
@@ -58,6 +59,7 @@ import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractRequestResponse;
@@ -75,7 +77,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
     public enum SaslState {
-        HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
+        GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
     }
 
     private final String node;
@@ -85,7 +87,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private final String host;
 
     // Current SASL state
-    private SaslState saslState = SaslState.HANDSHAKE_REQUEST;
+    private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST;
     // Next SASL state to be set when outgoing writes associated with the current SASL state
complete
     private SaslState pendingSaslState = null;
     private SaslServer saslServer;
@@ -215,6 +217,9 @@ public class SaslServerAuthenticator implements Authenticator {
             try {
                 switch (saslState) {
                     case HANDSHAKE_REQUEST:
+                        handleKafkaRequest(clientToken);
+                        break;
+                    case GSSAPI_OR_HANDSHAKE_REQUEST:
                         if (handleKafkaRequest(clientToken))
                             break;
                         // For default GSSAPI, fall through to authenticate using the client
token as the first GSSAPI packet.
@@ -288,39 +293,53 @@ public class SaslServerAuthenticator implements Authenticator {
         try {
             ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
             RequestHeader requestHeader = RequestHeader.parse(requestBuffer);
-            AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(),
requestHeader.apiVersion(), requestBuffer);
+            ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+            // A valid Kafka request header was received. SASL authentication tokens are
now expected only
+            // following a SaslHandshakeRequest since this is not a GSSAPI client token from
a Kafka 0.9.0.x client.
+            setSaslState(SaslState.HANDSHAKE_REQUEST);
             isKafkaRequest = true;
 
-            ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
-            LOG.debug("Handle Kafka request {}", apiKey);
-            switch (apiKey) {
-                case API_VERSIONS:
-                    handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
-                    break;
-                case SASL_HANDSHAKE:
-                    clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest)
request);
-                    break;
-                default:
-                    throw new IllegalSaslStateException("Unexpected Kafka request of type
" + apiKey + " during SASL handshake.");
+            if (!Protocol.apiVersionSupported(requestHeader.apiKey(), requestHeader.apiVersion()))
{
+                if (apiKey == ApiKeys.API_VERSIONS)
+                    sendKafkaResponse(requestHeader, ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION));
+                else
+                    throw new UnsupportedVersionException("Version " + requestHeader.apiVersion()
+ " is not supported for apiKey " + apiKey);
+            } else {
+                AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(),
requestHeader.apiVersion(), requestBuffer);
+
+                LOG.debug("Handle Kafka request {}", apiKey);
+                switch (apiKey) {
+                    case API_VERSIONS:
+                        handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
+                        break;
+                    case SASL_HANDSHAKE:
+                        clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest)
request);
+                        break;
+                    default:
+                        throw new IllegalSaslStateException("Unexpected Kafka request of
type " + apiKey + " during SASL handshake.");
+                }
             }
         } catch (SchemaException | IllegalArgumentException e) {
-            // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException
is thrown
-            // if the API key is invalid. For compatibility with 0.9.0.x where the first
packet is a GSSAPI token
-            // starting with 0x60, revert to GSSAPI for both these exceptions.
-            if (LOG.isDebugEnabled()) {
-                StringBuilder tokenBuilder = new StringBuilder();
-                for (byte b : requestBytes) {
-                    tokenBuilder.append(String.format("%02x", b));
-                    if (tokenBuilder.length() >= 20)
-                         break;
+            if (saslState == SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) {
+                // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException
is thrown
+                // if the API key is invalid. For compatibility with 0.9.0.x where the first
packet is a GSSAPI token
+                // starting with 0x60, revert to GSSAPI for both these exceptions.
+                if (LOG.isDebugEnabled()) {
+                    StringBuilder tokenBuilder = new StringBuilder();
+                    for (byte b : requestBytes) {
+                        tokenBuilder.append(String.format("%02x", b));
+                        if (tokenBuilder.length() >= 20)
+                             break;
+                    }
+                    LOG.debug("Received client packet of length {} starting with bytes 0x{},
process as GSSAPI packet", requestBytes.length, tokenBuilder);
                 }
-                LOG.debug("Received client packet of length {} starting with bytes 0x{},
process as GSSAPI packet", requestBytes.length, tokenBuilder);
-            }
-            if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
-                LOG.debug("First client packet is not a SASL mechanism request, using default
mechanism GSSAPI");
-                clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
+                if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
+                    LOG.debug("First client packet is not a SASL mechanism request, using
default mechanism GSSAPI");
+                    clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
+                } else
+                    throw new UnsupportedSaslMechanismException("Exception handling first
SASL packet from client, GSSAPI is not supported by server", e);
             } else
-                throw new UnsupportedSaslMechanismException("Exception handling first SASL
packet from client, GSSAPI is not supported by server", e);
+                throw e;
         }
         if (clientMechanism != null) {
             createSaslServer(clientMechanism);

http://git-wip-us.apache.org/repos/asf/kafka/blob/91130e42/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 368b5a7..97fe3d8 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasUtils;
@@ -244,6 +245,62 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests that unsupported version of ApiVersionsRequest before SASL handshake request
+     * returns error response and does not result in authentication failure. This test
+     * is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * where a non-SASL client is used to send requests that are processed by
+     * {@link SaslServerAuthenticator} of the server prior to client authentication.
+     */
+    @Test
+    public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+        // Send ApiVersionsRequest with unsupported version and validate error response.
+        String node = "1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node);
+        RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, Short.MAX_VALUE,
"someclient", 1);
+        selector.send(new NetworkSend(node, RequestSend.serialize(header, new ApiVersionsRequest().toStruct())));
+        ByteBuffer responseBuffer = waitForResponse();
+        ResponseHeader.parse(responseBuffer);
+        ApiVersionsResponse response = ApiVersionsResponse.parse(responseBuffer);
+        assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.errorCode());
+
+        // Send ApiVersionsRequest with a supported version. This should succeed.
+        sendVersionRequestReceiveResponse(node);
+
+        // Test that client can authenticate successfully
+        sendHandshakeRequestReceiveResponse(node);
+        authenticateUsingSaslPlainAndCheckConnection(node);
+    }
+
+    /**
+     * Tests that unsupported version of SASL handshake request returns error
+     * response and fails authentication. This test is similar to
+     * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * where a non-SASL client is used to send requests that are processed by
+     * {@link SaslServerAuthenticator} of the server prior to client authentication.
+     */
+    @Test
+    public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+        // Send ApiVersionsRequest and validate error response.
+        String node1 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE,
"someclient", 2);
+        selector.send(new NetworkSend(node1, RequestSend.serialize(header, new SaslHandshakeRequest("PLAIN").toStruct())));
+        NetworkTestUtils.waitForChannelClose(selector, node1);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good1");
+    }
+
+    /**
      * Tests that any invalid data during Kafka SASL handshake request flow
      * or the actual SASL authentication flow result in authentication failure
      * and do not cause any failures in the server.
@@ -485,6 +542,11 @@ public class SaslAuthenticatorTest {
         SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node);
         assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms());
 
+        // Complete manual authentication and check send/receive succeed
+        authenticateUsingSaslPlainAndCheckConnection(node);
+    }
+
+    private void authenticateUsingSaslPlainAndCheckConnection(String node) throws Exception
{
         // Authenticate using PLAIN username/password
         String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD;
         selector.send(new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8"))));

http://git-wip-us.apache.org/repos/asf/kafka/blob/91130e42/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 17c5b9b..e2000db 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -28,8 +28,8 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{Logging, SystemTime}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader}
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol, Protocol}
+import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader,
ApiVersionsRequest}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.log4j.Logger
 
@@ -84,8 +84,13 @@ object RequestChannel extends Logging {
         null
     val body: AbstractRequest =
       if (requestObj == null)
-        try AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
-        catch {
+        try {
+          // For unsupported version of ApiVersionsRequest, create a dummy request to enable
an error response to be returned later
+          if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey,
header.apiVersion))
+            new ApiVersionsRequest
+          else
+            AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
+        } catch {
           case ex: Throwable =>
             throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey}
and apiVersion: ${header.apiVersion}", ex)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91130e42/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index eb6358d..086bd4b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1029,9 +1029,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // with client authentication which is performed at an earlier stage of the connection
where the
     // ApiVersionRequest is not available.
     val responseHeader = new ResponseHeader(request.header.correlationId)
-    val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id)
&&
-                                              request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id)
-    val responseBody = if (isApiVersionsRequestVersionSupported)
+    val responseBody = if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
       ApiVersionsResponse.apiVersionsResponse
     else
       ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)

http://git-wip-us.apache.org/repos/asf/kafka/blob/91130e42/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 8bf4d73..f2dd60f 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.junit.Assert._
@@ -48,6 +48,12 @@ class ApiVersionsRequestTest extends BaseRequestTest {
     ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse)
   }
 
+  @Test
+  def testApiVersionsRequestWithUnsupportedVersion() {
+    val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, Short.MaxValue)
+    assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
+  }
+
   private def sendApiVersionsRequest(request: ApiVersionsRequest, version: Short): ApiVersionsResponse
= {
     val response = send(request, ApiKeys.API_VERSIONS, version)
     ApiVersionsResponse.parse(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/91130e42/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 632665a..8557008 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -19,11 +19,10 @@ package kafka.server
 import java.io.IOException
 import java.net.Socket
 import java.util.Collections
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.apache.kafka.common.requests.SaslHandshakeRequest
 import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.protocol.Errors
 import org.junit.Test
 import org.junit.Assert._
 import kafka.api.SaslTestHarness
@@ -64,6 +63,20 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness
{
     }
   }
 
+  @Test
+  def testApiVersionsRequestWithUnsupportedVersion() {
+    val plaintextSocket = connect(protocol = securityProtocol)
+    try {
+      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest,
Short.MaxValue)
+      assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
+      val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest,
0)
+      ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse2)
+      sendSaslHandshakeRequestValidateResponse(plaintextSocket)
+    } finally {
+      plaintextSocket.close()
+    }
+  }
+
   private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest, version:
Short): ApiVersionsResponse = {
     val response = send(socket, request, ApiKeys.API_VERSIONS, version)
     ApiVersionsResponse.parse(response)


Mime
View raw message