Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6EA4200D44 for ; Tue, 21 Nov 2017 00:00:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D53F3160BF9; Mon, 20 Nov 2017 23:00:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A5BAD160BE1 for ; Tue, 21 Nov 2017 00:00:18 +0100 (CET) Received: (qmail 61234 invoked by uid 500); 20 Nov 2017 23:00:17 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 61225 invoked by uid 99); 20 Nov 2017 23:00:17 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Nov 2017 23:00:17 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] rdhabalia closed pull request #884: Restrict partitioned-metadata rest-api access for cpp-client older than 1.21 Message-ID: <151121881715.27544.15427530439450184485.gitbox@gitbox.apache.org> archived-at: Mon, 20 Nov 2017 23:00:20 -0000 rdhabalia closed pull request #884: Restrict partitioned-metadata rest-api access for cpp-client older than 1.21 URL: https://github.com/apache/incubator-pulsar/pull/884 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e917d7e3b..6c4c9bc6c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -105,6 +105,7 @@ private int defaultNumberOfNamespaceBundles = 4; // Enable check for minimum allowed client library version + @FieldContext(dynamic = true) private boolean clientLibraryVersionCheckEnabled = false; // Allow client libraries with no version information private boolean clientLibraryVersionCheckAllowUnversioned = true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java index a0ca6a106..50ff2c2ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java @@ -65,6 +65,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -113,6 +114,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import com.github.zafarkhaja.semver.Version; /** */ @@ -126,6 +128,8 @@ protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000; private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10; + private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v"; + private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1,21); @GET @Path("/{property}/{cluster}/{namespace}") @@ -470,7 +474,11 @@ public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") St @PathParam("destination") @Encoded String destination, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { destination = decode(destination); - return getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); + PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); + if (metadata.partitions > 1) { + validateClientVersion(); + } + return metadata; } @DELETE @@ -1460,4 +1468,41 @@ protected void unloadTopic(DestinationName destination, boolean authoritative) { throw new RestException(e.getCause()); } } + + // as described at : (PR: #836) CPP-client old client lib should not be allowed to connect on partitioned-topic. + // So, all requests from old-cpp-client (< v1.21) must be rejected. + // Pulsar client-java lib always passes user-agent as X-Java-$version. + // However, cpp-client older than v1.20 (PR #765) never used to pass it. + // So, request without user-agent and Pulsar-CPP-vX (X < 1.21) must be rejected + private void validateClientVersion() { + if (!pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) { + return; + } + final String userAgent = httpRequest.getHeader("User-Agent"); + if (StringUtils.isBlank(userAgent)) { + throw new RestException(Status.METHOD_NOT_ALLOWED, + "Client lib is not compatible to access partitioned metadata: version in user-agent is not present"); + } + // Version < 1.20 for cpp-client is not allowed + if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) { + try { + // Version < 1.20 for cpp-client is not allowed + String[] tokens = userAgent.split(DEPRECATED_CLIENT_VERSION_PREFIX); + String[] splits = tokens.length > 1 ? tokens[1].split("-")[0].trim().split("\\.") : null; + if (splits != null && splits.length > 1) { + if (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0]) + || LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(splits[1])) { + throw new RestException(Status.METHOD_NOT_ALLOWED, + "Client lib is not compatible to access partitioned metadata: version " + userAgent + + " is not supported"); + } + } + } catch (RestException re) { + throw re; + } catch (Exception e) { + log.warn("[{}] Failed to parse version {} ", clientAppId(), userAgent); + } + } + return; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 229d58444..c3c239b74 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -22,11 +22,17 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; import java.net.URLConnection; @@ -37,6 +43,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -60,37 +68,40 @@ import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.AuthenticationDataProvider; -import org.apache.pulsar.client.api.ClientConfiguration; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.discovery.service.DiscoveryService; import org.apache.pulsar.discovery.service.server.ServiceConfig; +import org.asynchttpclient.AsyncCompletionHandler; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncHttpClientConfig; +import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Request; +import org.asynchttpclient.Response; +import org.asynchttpclient.channel.DefaultKeepAliveStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; public class BrokerServiceLookupTest extends ProducerConsumerBase { @@ -342,11 +353,11 @@ public void testPartitionTopicLookup() throws Exception { Set messageSet = Sets.newHashSet(); for (int i = 0; i < 20; i++) { msg = consumer.receive(5, TimeUnit.SECONDS); - Assert.assertNotNull(msg, "Message should not be null"); + assertNotNull(msg, "Message should not be null"); consumer.acknowledge(msg); String receivedMessage = new String(msg.getData()); log.debug("Received message: [{}]", receivedMessage); - Assert.assertTrue(messageSet.add(receivedMessage), "Message " + receivedMessage + " already received"); + assertTrue(messageSet.add(receivedMessage), "Message " + receivedMessage + " already received"); } producer.close(); @@ -443,11 +454,11 @@ public void testWebserviceServiceTls() throws Exception { con.connect(); log.info("connected url: {} ", con.getURL()); // assert connect-url: broker2-https - Assert.assertEquals(con.getURL().getPort(), conf2.getWebServicePortTls()); + assertEquals(con.getURL().getPort(), conf2.getWebServicePortTls()); InputStream is = con.getInputStream(); // assert redirect-url: broker1-https only log.info("redirected url: {}", con.getURL()); - Assert.assertEquals(con.getURL().getPort(), conf.getWebServicePortTls()); + assertEquals(con.getURL().getPort(), conf.getWebServicePortTls()); is.close(); pulsarClient2.close(); @@ -689,7 +700,7 @@ public void start() throws PulsarClientException { try { pulsarClient.subscribe("persistent://my-property/use2/my-ns/my-topic1", "my-subscriber-name", new ConsumerConfiguration()); - Assert.fail("should have failed due to authentication"); + fail("should have failed due to authentication"); } catch (PulsarClientException e) { // Ok: expected } @@ -739,10 +750,10 @@ public void start() throws PulsarClientException { try { pulsarClient.subscribe("persistent://my-property/use2/my-ns/my-topic1", "my-subscriber-name", new ConsumerConfiguration()); - Assert.fail("should have failed due to authentication"); + fail("should have failed due to authentication"); } catch (PulsarClientException e) { // Ok: expected - Assert.assertTrue(e instanceof PulsarClientException.LookupException); + assertTrue(e instanceof PulsarClientException.LookupException); } } @@ -812,11 +823,11 @@ public void testSplitUnloadLookupTest() throws Exception { // (4) Broker-1 will own topic-1 final String unsplitBundle = namespace + "/0x00000000_0xffffffff"; - Assert.assertTrue(serviceUnits1.contains(unsplitBundle)); + assertTrue(serviceUnits1.contains(unsplitBundle)); // broker-2 should have this bundle into the cache DestinationName destination = DestinationName.get(topic1); NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(destination); - Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle); + assertEquals(bundleInBroker2.toString(), unsplitBundle); // (5) Split the bundle for topic-1 admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); @@ -837,7 +848,7 @@ public void testSplitUnloadLookupTest() throws Exception { NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService() .getBundle(DestinationName.get(topic2)); - Assert.assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle)); + assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle)); consumer1.close(); consumer2.close(); @@ -918,11 +929,11 @@ public void testModularLoadManagerSplitBundle() throws Exception { // (4) Broker-1 will own topic-1 final String unsplitBundle = namespace + "/0x00000000_0xffffffff"; - Assert.assertTrue(serviceUnits1.contains(unsplitBundle)); + assertTrue(serviceUnits1.contains(unsplitBundle)); // broker-2 should have this bundle into the cache DestinationName destination = DestinationName.get(topic1); NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(destination); - Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle); + assertEquals(bundleInBroker2.toString(), unsplitBundle); // update broker-1 bundle report to zk pulsar.getBrokerService().updateRates(); @@ -963,7 +974,7 @@ public void testModularLoadManagerSplitBundle() throws Exception { NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService() .getBundle(DestinationName.get(topic2)); - Assert.assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle)); + assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle)); consumer1.close(); consumer2.close(); @@ -974,7 +985,143 @@ public void testModularLoadManagerSplitBundle() throws Exception { } } - + + @Test + public void testPartitionedMetadataWithDeprecatedVersion() throws Exception { + + final String cluster = "use2"; + final String property = "my-property2"; + final String namespace = "my-ns"; + final String topicName = "my-partitioned"; + final int totalPartitions = 10; + final DestinationName dest = DestinationName.get("persistent", property, cluster, namespace, topicName); + admin.clusters().createCluster(cluster, + new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT, null, null, null)); + admin.properties().createProperty(property, + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster))); + admin.namespaces().createNamespace(property + "/" + cluster + "/" + namespace); + admin.persistentTopics().createPartitionedTopic(dest.toString(), totalPartitions); + + stopBroker(); + conf.setClientLibraryVersionCheckEnabled(true); + startBroker(); + + URI brokerServiceUrl = new URI(pulsar.getWebServiceAddress()); + + URL url = brokerServiceUrl.toURL(); + String path = String.format("admin/%s/partitions", dest.getLookupName()); + + AsyncHttpClient httpClient = getHttpClient("Pulsar-Java-1.20"); + PartitionedTopicMetadata metadata = getPartitionedMetadata(httpClient, url, path); + assertEquals(metadata.partitions, totalPartitions); + httpClient.close(); + + httpClient = getHttpClient("Pulsar-CPP-v1.21"); + metadata = getPartitionedMetadata(httpClient, url, path); + assertEquals(metadata.partitions, totalPartitions); + httpClient.close(); + + httpClient = getHttpClient("Pulsar-CPP-v1.21-SNAPSHOT"); + metadata = getPartitionedMetadata(httpClient, url, path); + assertEquals(metadata.partitions, totalPartitions); + httpClient.close(); + + httpClient = getHttpClient(""); + try { + metadata = getPartitionedMetadata(httpClient, url, path); + fail("should have failed due to invalid version"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof PulsarClientException); + } + httpClient.close(); + + httpClient = getHttpClient("Pulsar-CPP-v1.20-SNAPSHOT"); + try { + metadata = getPartitionedMetadata(httpClient, url, path); + fail("should have failed due to invalid version"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof PulsarClientException); + } + httpClient.close(); + + httpClient = getHttpClient("Pulsar-CPP-v1.20"); + try { + metadata = getPartitionedMetadata(httpClient, url, path); + fail("should have failed due to invalid version"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof PulsarClientException); + } + httpClient.close(); + } + + private PartitionedTopicMetadata getPartitionedMetadata(AsyncHttpClient httpClient, URL url, String path) + throws Exception { + final CompletableFuture future = new CompletableFuture<>(); + try { + + String requestUrl = new URL(url, path).toString(); + BoundRequestBuilder builder = httpClient.prepareGet(requestUrl); + + final ListenableFuture responseFuture = builder.setHeader("Accept", "application/json") + .execute(new AsyncCompletionHandler() { + + @Override + public Response onCompleted(Response response) throws Exception { + return response; + } + + @Override + public void onThrowable(Throwable t) { + log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage()); + future.completeExceptionally(new PulsarClientException(t)); + } + }); + + responseFuture.addListener(() -> { + try { + Response response = responseFuture.get(); + if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { + log.warn("[{}] HTTP get request failed: {}", requestUrl, response.getStatusText()); + future.completeExceptionally( + new PulsarClientException("HTTP get request failed: " + response.getStatusText())); + return; + } + + PartitionedTopicMetadata data = ObjectMapperFactory.getThreadLocal() + .readValue(response.getResponseBodyAsBytes(), PartitionedTopicMetadata.class); + future.complete(data); + } catch (Exception e) { + log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage()); + future.completeExceptionally(new PulsarClientException(e)); + } + }, MoreExecutors.sameThreadExecutor()); + + } catch (Exception e) { + log.warn("[{}] Failed to get authentication data for lookup: {}", path, e.getMessage()); + if (e instanceof PulsarClientException) { + future.completeExceptionally(e); + } else { + future.completeExceptionally(new PulsarClientException(e)); + } + } + return future.get(); + } + + private AsyncHttpClient getHttpClient(String version) { + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + confBuilder.setFollowRedirect(true); + confBuilder.setUserAgent(version); + confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() { + @Override + public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse response) { + // Close connection upon a server error or per HTTP spec + return (response.getStatus().code() / 100 != 5) && super.keepAlive(ahcRequest, request, response); + } + }); + AsyncHttpClientConfig config = confBuilder.build(); + return new DefaultAsyncHttpClient(config); + } + /**** helper classes ****/ public static class MockAuthenticationProvider implements AuthenticationProvider { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services