pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rdhabalia closed pull request #884: Restrict partitioned-metadata rest-api access for cpp-client older than 1.21
Date Thu, 01 Jan 1970 00:00:00 GMT
rdhabalia closed pull request #884: Restrict partitioned-metadata rest-api access for cpp-client
older than 1.21
URL: https://github.com/apache/incubator-pulsar/pull/884
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e917d7e3b..6c4c9bc6c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -105,6 +105,7 @@
     private int defaultNumberOfNamespaceBundles = 4;
 
     // Enable check for minimum allowed client library version
+    @FieldContext(dynamic = true)
     private boolean clientLibraryVersionCheckEnabled = false;
     // Allow client libraries with no version information
     private boolean clientLibraryVersionCheckAllowUnversioned = true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index a0ca6a106..50ff2c2ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -65,6 +65,7 @@
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -113,6 +114,7 @@
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import com.github.zafarkhaja.semver.Version;
 
 /**
  */
@@ -126,6 +128,8 @@
 
     protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
     private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
+    private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
+    private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1,21);
 
     @GET
     @Path("/{property}/{cluster}/{namespace}")
@@ -470,7 +474,11 @@ public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property")
St
             @PathParam("destination") @Encoded String destination,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         destination = decode(destination);
-       return getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
+        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(property, cluster,
namespace, destination, authoritative);
+        if (metadata.partitions > 1) {
+            validateClientVersion();
+        }
+        return metadata;
     }
 
     @DELETE
@@ -1460,4 +1468,41 @@ protected void unloadTopic(DestinationName destination, boolean authoritative)
{
             throw new RestException(e.getCause());
         }
     }
+
+    // as described at : (PR: #836) CPP-client old client lib should not be allowed to connect
on partitioned-topic.
+    // So, all requests from old-cpp-client (< v1.21) must be rejected.
+    // Pulsar client-java lib always passes user-agent as X-Java-$version.
+    // However, cpp-client older than v1.20 (PR #765) never used to pass it.
+    // So, request without user-agent and Pulsar-CPP-vX (X < 1.21) must be rejected
+    private void validateClientVersion() {
+        if (!pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
+            return;
+        }
+        final String userAgent = httpRequest.getHeader("User-Agent");
+        if (StringUtils.isBlank(userAgent)) {
+            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Client lib is not compatible to access partitioned metadata: version
in user-agent is not present");
+        }
+        // Version < 1.20 for cpp-client is not allowed
+        if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
+            try {
+                // Version < 1.20 for cpp-client is not allowed
+                String[] tokens = userAgent.split(DEPRECATED_CLIENT_VERSION_PREFIX);
+                String[] splits = tokens.length > 1 ? tokens[1].split("-")[0].trim().split("\\.")
: null;
+                if (splits != null && splits.length > 1) {
+                    if (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0])
+                            || LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() >
Integer.parseInt(splits[1])) {
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Client lib is not compatible to access partitioned metadata:
version " + userAgent
+                                        + " is not supported");
+                    }
+                }
+            } catch (RestException re) {
+                throw re;
+            } catch (Exception e) {
+                log.warn("[{}] Failed to parse version {} ", clientAppId(), userAgent);
+            }
+        }
+        return;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 229d58444..c3c239b74 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -22,11 +22,17 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLConnection;
@@ -37,6 +43,8 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -60,37 +68,40 @@
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.discovery.service.DiscoveryService;
 import org.apache.pulsar.discovery.service.server.ServiceConfig;
+import org.asynchttpclient.AsyncCompletionHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 
 public class BrokerServiceLookupTest extends ProducerConsumerBase {
@@ -342,11 +353,11 @@ public void testPartitionTopicLookup() throws Exception {
         Set<String> messageSet = Sets.newHashSet();
         for (int i = 0; i < 20; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
-            Assert.assertNotNull(msg, "Message should not be null");
+            assertNotNull(msg, "Message should not be null");
             consumer.acknowledge(msg);
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);
-            Assert.assertTrue(messageSet.add(receivedMessage), "Message " + receivedMessage
+ " already received");
+            assertTrue(messageSet.add(receivedMessage), "Message " + receivedMessage + "
already received");
         }
 
         producer.close();
@@ -443,11 +454,11 @@ public void testWebserviceServiceTls() throws Exception {
 		con.connect();
 		log.info("connected url: {} ", con.getURL());
 		// assert connect-url: broker2-https
-		Assert.assertEquals(con.getURL().getPort(), conf2.getWebServicePortTls());
+		assertEquals(con.getURL().getPort(), conf2.getWebServicePortTls());
 		InputStream is = con.getInputStream();
 		// assert redirect-url: broker1-https only
 		log.info("redirected url: {}", con.getURL());
-		Assert.assertEquals(con.getURL().getPort(), conf.getWebServicePortTls());
+		assertEquals(con.getURL().getPort(), conf.getWebServicePortTls());
 		is.close();
 
 		pulsarClient2.close();
@@ -689,7 +700,7 @@ public void start() throws PulsarClientException {
         try {
             pulsarClient.subscribe("persistent://my-property/use2/my-ns/my-topic1", "my-subscriber-name",
                     new ConsumerConfiguration());
-            Assert.fail("should have failed due to authentication");
+            fail("should have failed due to authentication");
         } catch (PulsarClientException e) {
             // Ok: expected
         }
@@ -739,10 +750,10 @@ public void start() throws PulsarClientException {
         try {
             pulsarClient.subscribe("persistent://my-property/use2/my-ns/my-topic1", "my-subscriber-name",
                     new ConsumerConfiguration());
-            Assert.fail("should have failed due to authentication");
+            fail("should have failed due to authentication");
         } catch (PulsarClientException e) {
             // Ok: expected
-            Assert.assertTrue(e instanceof PulsarClientException.LookupException);
+            assertTrue(e instanceof PulsarClientException.LookupException);
         }
     }
 
@@ -812,11 +823,11 @@ public void testSplitUnloadLookupTest() throws Exception {
 
         // (4) Broker-1 will own topic-1
         final String unsplitBundle = namespace + "/0x00000000_0xffffffff";
-        Assert.assertTrue(serviceUnits1.contains(unsplitBundle));
+        assertTrue(serviceUnits1.contains(unsplitBundle));
         // broker-2 should have this bundle into the cache
         DestinationName destination = DestinationName.get(topic1);
         NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(destination);
-        Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle);
+        assertEquals(bundleInBroker2.toString(), unsplitBundle);
 
         // (5) Split the bundle for topic-1
         admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true);
@@ -837,7 +848,7 @@ public void testSplitUnloadLookupTest() throws Exception {
 
         NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
                 .getBundle(DestinationName.get(topic2));
-        Assert.assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle));
+        assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle));
 
         consumer1.close();
         consumer2.close();
@@ -918,11 +929,11 @@ public void testModularLoadManagerSplitBundle() throws Exception {
 
             // (4) Broker-1 will own topic-1
             final String unsplitBundle = namespace + "/0x00000000_0xffffffff";
-            Assert.assertTrue(serviceUnits1.contains(unsplitBundle));
+            assertTrue(serviceUnits1.contains(unsplitBundle));
             // broker-2 should have this bundle into the cache
             DestinationName destination = DestinationName.get(topic1);
             NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(destination);
-            Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle);
+            assertEquals(bundleInBroker2.toString(), unsplitBundle);
 
             // update broker-1 bundle report to zk
             pulsar.getBrokerService().updateRates();
@@ -963,7 +974,7 @@ public void testModularLoadManagerSplitBundle() throws Exception {
 
             NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
                     .getBundle(DestinationName.get(topic2));
-            Assert.assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle));
+            assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle));
 
             consumer1.close();
             consumer2.close();
@@ -974,7 +985,143 @@ public void testModularLoadManagerSplitBundle() throws Exception {
         }
 
     }
-    
+
+    @Test
+    public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {
+
+        final String cluster = "use2";
+        final String property = "my-property2";
+        final String namespace = "my-ns";
+        final String topicName = "my-partitioned";
+        final int totalPartitions = 10;
+        final DestinationName dest = DestinationName.get("persistent", property, cluster,
namespace, topicName);
+        admin.clusters().createCluster(cluster,
+                new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT, null, null,
null));
+        admin.properties().createProperty(property,
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster)));
+        admin.namespaces().createNamespace(property + "/" + cluster + "/" + namespace);
+        admin.persistentTopics().createPartitionedTopic(dest.toString(), totalPartitions);
+
+        stopBroker();
+        conf.setClientLibraryVersionCheckEnabled(true);
+        startBroker();
+
+        URI brokerServiceUrl = new URI(pulsar.getWebServiceAddress());
+
+        URL url = brokerServiceUrl.toURL();
+        String path = String.format("admin/%s/partitions", dest.getLookupName());
+
+        AsyncHttpClient httpClient = getHttpClient("Pulsar-Java-1.20");
+        PartitionedTopicMetadata metadata = getPartitionedMetadata(httpClient, url, path);
+        assertEquals(metadata.partitions, totalPartitions);
+        httpClient.close();
+
+        httpClient = getHttpClient("Pulsar-CPP-v1.21");
+        metadata = getPartitionedMetadata(httpClient, url, path);
+        assertEquals(metadata.partitions, totalPartitions);
+        httpClient.close();
+
+        httpClient = getHttpClient("Pulsar-CPP-v1.21-SNAPSHOT");
+        metadata = getPartitionedMetadata(httpClient, url, path);
+        assertEquals(metadata.partitions, totalPartitions);
+        httpClient.close();
+
+        httpClient = getHttpClient("");
+        try {
+            metadata = getPartitionedMetadata(httpClient, url, path);
+            fail("should have failed due to invalid version");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof PulsarClientException);
+        }
+        httpClient.close();
+
+        httpClient = getHttpClient("Pulsar-CPP-v1.20-SNAPSHOT");
+        try {
+            metadata = getPartitionedMetadata(httpClient, url, path);
+            fail("should have failed due to invalid version");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof PulsarClientException);
+        }
+        httpClient.close();
+
+        httpClient = getHttpClient("Pulsar-CPP-v1.20");
+        try {
+            metadata = getPartitionedMetadata(httpClient, url, path);
+            fail("should have failed due to invalid version");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof PulsarClientException);
+        }
+        httpClient.close();
+    }
+
+    private PartitionedTopicMetadata getPartitionedMetadata(AsyncHttpClient httpClient, URL
url, String path)
+            throws Exception {
+        final CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
+        try {
+
+            String requestUrl = new URL(url, path).toString();
+            BoundRequestBuilder builder = httpClient.prepareGet(requestUrl);
+
+            final ListenableFuture<Response> responseFuture = builder.setHeader("Accept",
"application/json")
+                    .execute(new AsyncCompletionHandler<Response>() {
+
+                        @Override
+                        public Response onCompleted(Response response) throws Exception {
+                            return response;
+                        }
+
+                        @Override
+                        public void onThrowable(Throwable t) {
+                            log.warn("[{}] Failed to perform http request: {}", requestUrl,
t.getMessage());
+                            future.completeExceptionally(new PulsarClientException(t));
+                        }
+                    });
+
+            responseFuture.addListener(() -> {
+                try {
+                    Response response = responseFuture.get();
+                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
+                        log.warn("[{}] HTTP get request failed: {}", requestUrl, response.getStatusText());
+                        future.completeExceptionally(
+                                new PulsarClientException("HTTP get request failed: " + response.getStatusText()));
+                        return;
+                    }
+
+                    PartitionedTopicMetadata data = ObjectMapperFactory.getThreadLocal()
+                            .readValue(response.getResponseBodyAsBytes(), PartitionedTopicMetadata.class);
+                    future.complete(data);
+                } catch (Exception e) {
+                    log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage());
+                    future.completeExceptionally(new PulsarClientException(e));
+                }
+            }, MoreExecutors.sameThreadExecutor());
+
+        } catch (Exception e) {
+            log.warn("[{}] Failed to get authentication data for lookup: {}", path, e.getMessage());
+            if (e instanceof PulsarClientException) {
+                future.completeExceptionally(e);
+            } else {
+                future.completeExceptionally(new PulsarClientException(e));
+            }
+        }
+        return future.get();
+    }
+
+    private AsyncHttpClient getHttpClient(String version) {
+        DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+        confBuilder.setFollowRedirect(true);
+        confBuilder.setUserAgent(version);
+        confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
+            @Override
+            public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse
response) {
+                // Close connection upon a server error or per HTTP spec
+                return (response.getStatus().code() / 100 != 5) && super.keepAlive(ahcRequest,
request, response);
+            }
+        });
+        AsyncHttpClientConfig config = confBuilder.build();
+        return new DefaultAsyncHttpClient(config);
+    }
+
     /**** helper classes ****/
 
     public static class MockAuthenticationProvider implements AuthenticationProvider {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message