pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] nkurihar closed pull request #901: Introduce configuration converter
Date Thu, 01 Jan 1970 00:00:00 GMT
nkurihar closed pull request #901: Introduce configuration converter
URL: https://github.com/apache/incubator-pulsar/pull/901
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e917d7e3b..27eff7f06 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index 7aebaa445..b9e0321fc 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.common.configuration;
 
+import org.apache.pulsar.broker.ServiceConfiguration;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.common.util.FieldParser.update;
 
@@ -25,6 +27,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 
@@ -127,4 +130,44 @@ public static boolean isComplete(Object obj) throws IllegalArgumentException,
Il
         return true;
     }
 
+    /**
+     * Converts a PulsarConfiguration object to a ServiceConfiguration object.
+     *
+     * @param conf
+     * @param ignoreNonExistMember
+     * @return
+     * @throws IllegalArgumentException
+     *             if conf has the field whose name is not contained in ServiceConfiguration
and ignoreNonExistMember is false.
+     * @throws RuntimeException
+     */
+    public static ServiceConfiguration convertFrom(PulsarConfiguration conf, boolean ignoreNonExistMember)
throws RuntimeException {
+        try {
+            final ServiceConfiguration convertedConf = ServiceConfiguration.class.newInstance();
+            Field[] confFields = conf.getClass().getDeclaredFields();
+            Arrays.stream(confFields).forEach(confField -> {
+                try {
+                    Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName());
+                    confField.setAccessible(true);
+                    convertedConfField.setAccessible(true);
+                    convertedConfField.set(convertedConf, confField.get(conf));
+                } catch (NoSuchFieldException e) {
+                    if (!ignoreNonExistMember) {
+                        throw new IllegalArgumentException("Exception caused while converting
configuration: " + e.getMessage());
+                    }
+                } catch (IllegalAccessException e) {
+                    throw new RuntimeException("Exception caused while converting configuration:
" + e.getMessage());
+                }
+            });
+            return convertedConf;
+        } catch (InstantiationException e) {
+            throw new RuntimeException("Exception caused while converting configuration:
" + e.getMessage());
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("Exception caused while converting configuration:
" + e.getMessage());
+        }
+    }
+
+    public static ServiceConfiguration convertFrom(PulsarConfiguration conf) throws RuntimeException
{
+        return convertFrom(conf, true);
+    }
+
 }
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index 1b1673355..5ff95dc44 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -33,11 +33,52 @@
 import java.util.Properties;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.configuration.FieldContext;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.testng.annotations.Test;
 
 public class PulsarConfigurationLoaderTest {
+    public class MockConfiguration implements PulsarConfiguration {
+        private Properties properties = new Properties();
+
+        private String zookeeperServers = "localhost:2181";
+        private String globalZookeeperServers = "localhost:2184";
+        private int brokerServicePort = 7650;
+        private int brokerServicePortTls = 7651;
+        private int webServicePort = 9080;
+        private int webServicePortTls = 9443;
+        private int notExistFieldInServiceConfig = 0;
+
+        @Override
+        public Properties getProperties() {
+            return properties;
+        }
+
+        @Override
+        public void setProperties(Properties properties) {
+            this.properties = properties;
+        }
+    }
+
+    @Test
+    public void testConfigurationConverting() throws Exception {
+        MockConfiguration mockConfiguration = new MockConfiguration();
+        ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(mockConfiguration);
+
+        // check whether converting correctly
+        assertEquals(serviceConfiguration.getZookeeperServers(), "localhost:2181");
+        assertEquals(serviceConfiguration.getGlobalZookeeperServers(), "localhost:2184");
+        assertEquals(serviceConfiguration.getBrokerServicePort(), 7650);
+        assertEquals(serviceConfiguration.getBrokerServicePortTls(), 7651);
+        assertEquals(serviceConfiguration.getWebServicePort(), 9080);
+        assertEquals(serviceConfiguration.getWebServicePortTls(), 9443);
+
+        // check whether exception causes
+        try {
+            PulsarConfigurationLoader.convertFrom(mockConfiguration, false);
+            fail();
+        } catch (Exception e) {
+            assertEquals(e.getClass(), IllegalArgumentException.class);
+        }
+    }
 
     @Test
     public void testPulsarConfiguraitonLoadingStream() throws Exception {
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 76148dbad..8af2b0a62 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -28,6 +28,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationManager;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.discovery.service.server.ServiceConfig;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -79,7 +80,7 @@ public DiscoveryService(ServiceConfig serviceConfig) {
     public void start() throws Exception {
         discoveryProvider = new BrokerDiscoveryProvider(this.config, getZooKeeperClientFactory());
         this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
-        ServiceConfiguration serviceConfiguration = createServiceConfiguration(config);
+        ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
         authenticationService = new AuthenticationService(serviceConfiguration);
         authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
         startServer();
@@ -132,16 +133,6 @@ public void close() throws IOException {
         workerGroup.shutdownGracefully();
     }
 
-    private ServiceConfiguration createServiceConfiguration(ServiceConfig config) {
-        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
-        serviceConfiguration.setAuthenticationEnabled(config.isAuthenticationEnabled());
-        serviceConfiguration.setAuthorizationEnabled(config.isAuthorizationEnabled());
-        serviceConfiguration.setAuthenticationProviders(config.getAuthenticationProviders());
-        serviceConfiguration.setAuthorizationAllowWildcardsMatching(config.getAuthorizationAllowWildcardsMatching());
-        serviceConfiguration.setProperties(config.getProperties());
-        return serviceConfiguration;
-    }
-
     /**
      * Derive the host
      *
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 91e93f769..fed6e15c8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -24,7 +24,6 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -34,6 +33,7 @@
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -122,7 +122,7 @@ public void shutdown(int exitCode) {
 
         discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
         this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
-        ServiceConfiguration serviceConfiguration = createServiceConfiguration(proxyConfig);
+        ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig);
         authenticationService = new AuthenticationService(serviceConfiguration);
         authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
 
@@ -181,15 +181,6 @@ public void close() throws IOException {
         client.close();
     }
 
-    private ServiceConfiguration createServiceConfiguration(ProxyConfiguration config) {
-        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
-        serviceConfiguration.setAuthenticationEnabled(config.isAuthenticationEnabled());
-        serviceConfiguration.setAuthorizationEnabled(config.isAuthorizationEnabled());
-        serviceConfiguration.setAuthenticationProviders(config.getAuthenticationProviders());
-        serviceConfiguration.setProperties(config.getProperties());
-        return serviceConfiguration;
-    }
-
     public String getServiceUrl() {
         return serviceUrl;
     }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 838a2f1bf..28a0ed511 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -39,6 +39,7 @@
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -66,8 +67,9 @@
     AuthorizationManager authorizationManager;
     PulsarClient pulsarClient;
 
-    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
-            WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS, new DefaultThreadFactory("pulsar-websocket"));
+    private final ScheduledExecutorService executor = Executors
+            .newScheduledThreadPool(WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS,
+                    new DefaultThreadFactory("pulsar-websocket"));
     private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(
             WebSocketProxyConfiguration.GLOBAL_ZK_THREADS, "pulsar-websocket-ordered");
     private GlobalZooKeeperCache globalZkCache;
@@ -82,7 +84,7 @@
     private final ProxyStats proxyStats;
 
     public WebSocketService(WebSocketProxyConfiguration config) {
-        this(createClusterData(config), createServiceConfiguration(config));
+        this(createClusterData(config), PulsarConfigurationLoader.convertFrom(config));
     }
 
     public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
@@ -210,31 +212,6 @@ private static ClusterData createClusterData(WebSocketProxyConfiguration
config)
         }
     }
 
-    private static ServiceConfiguration createServiceConfiguration(WebSocketProxyConfiguration
config) {
-        ServiceConfiguration serviceConfig = new ServiceConfiguration();
-        serviceConfig.setProperties(config.getProperties());
-        serviceConfig.setClusterName(config.getClusterName());
-        serviceConfig.setWebServicePort(config.getWebServicePort());
-        serviceConfig.setWebServicePortTls(config.getWebServicePortTls());
-        serviceConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
-        serviceConfig.setAuthenticationProviders(config.getAuthenticationProviders());
-        serviceConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin());
-        serviceConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters());
-        serviceConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
-        serviceConfig.setAuthorizationAllowWildcardsMatching(config.getAuthorizationAllowWildcardsMatching());
-        serviceConfig.setSuperUserRoles(config.getSuperUserRoles());
-        serviceConfig.setGlobalZookeeperServers(config.getGlobalZookeeperServers());
-        serviceConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
-        serviceConfig.setTlsEnabled(config.isTlsEnabled());
-        serviceConfig.setTlsTrustCertsFilePath(config.getTlsTrustCertsFilePath());
-        serviceConfig.setTlsCertificateFilePath(config.getTlsCertificateFilePath());
-        serviceConfig.setTlsKeyFilePath(config.getTlsKeyFilePath());
-        serviceConfig.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
-        serviceConfig.setWebSocketNumIoThreads(config.getNumIoThreads());
-        serviceConfig.setWebSocketConnectionsPerBroker(config.getConnectionsPerBroker());
-        return serviceConfig;
-    }
-
     private ClusterData retrieveClusterData() throws PulsarServerException {
         if (configurationCacheService == null) {
             throw new PulsarServerException("Failed to retrieve Cluster data due to empty
GlobalZookeeperServers");
@@ -307,16 +284,16 @@ public boolean removeConsumer(ConsumerHandler consumer) {
         }
         return false;
     }
-    
+
     public boolean addReader(ReaderHandler reader) {
         return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic ->
new ConcurrentOpenHashSet<>())
                 .add(reader);
     }
-    
+
     public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>>
getReaders() {
         return topicReaderMap;
     }
-    
+
     public boolean removeReader(ReaderHandler reader) {
         final String topicName = reader.getConsumer().getTopic();
         if (topicReaderMap.containsKey(topicName)) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message