pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [pulsar] branch master updated: Refresh Certs every X minutes (#3568)
Date Mon, 18 Feb 2019 22:53:20 GMT
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a0f7661  Refresh Certs every X minutes (#3568)
a0f7661 is described below

commit a0f7661dd32222beccb0185549bad3f5923b7cb8
Author: Jai Asher <jai1@ccs.neu.edu>
AuthorDate: Mon Feb 18 14:53:14 2019 -0800

    Refresh Certs every X minutes (#3568)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  5 ++
 .../broker/service/PulsarChannelInitializer.java   | 28 +++----
 .../client/impl/auth/AuthenticationDataTls.java    | 37 +++++++--
 .../pulsar/client/impl/auth/AuthenticationTls.java |  2 +-
 .../common/util/ClientSslContextRefresher.java     | 68 +++++++++++++++++
 .../common/util/FileModifiedTimeUpdater.java       | 65 ++++++++++++++++
 .../common/util/ServerSslContextRefresher.java     | 87 ++++++++++++++++++++++
 .../common/util/FileModifiedTimeUpdaterTest.java   | 68 +++++++++++++++++
 .../pulsar/discovery/service/DiscoveryService.java | 20 +++--
 .../service/ServiceChannelInitializer.java         | 26 ++++---
 .../discovery/service/server/ServiceConfig.java    | 10 +++
 .../discovery/service/DiscoveryServiceTest.java    |  5 --
 .../pulsar/proxy/server/ProxyConfiguration.java    | 11 ++-
 .../proxy/server/ServiceChannelInitializer.java    | 48 ++++++------
 14 files changed, 404 insertions(+), 76 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8e00131..1d697b2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -449,6 +449,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean tlsEnabled = false;
     @FieldContext(
         category = CATEGORY_TLS,
+        doc = "Time Interval in Mins between checks for Cert Refresh."
+    )
+    private long certRefreshCheckDurationInMins = 0;
+    @FieldContext(
+        category = CATEGORY_TLS,
         doc = "Path for the TLS certificate file"
     )
     private String tlsCertificateFilePath;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 4cd5c02..d774bd4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -18,23 +18,23 @@
  */
 package org.apache.pulsar.broker.service;
 
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.ssl.SslContext;
-
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.PulsarDecoder;
-import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.ServerSslContextRefresher;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
 
     public static final String TLS_HANDLER = "tls";
 
     private final PulsarService pulsar;
-    private final SslContext sslCtx;
+    private final boolean enableTls;
+    private final ServerSslContextRefresher sslCtxRefresher;
 
     /**
      *
@@ -43,21 +43,23 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
     public PulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception
{
         super();
         this.pulsar = pulsar;
-        if (enableTLS) {
+        this.enableTls = enableTLS;
+        if (this.enableTls) {
             ServiceConfiguration serviceConfig = pulsar.getConfiguration();
-            this.sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(),
+            sslCtxRefresher = new ServerSslContextRefresher(serviceConfig.isTlsAllowInsecureConnection(),
                     serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
                     serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
-                    serviceConfig.isTlsRequireTrustedClientCertOnConnect());
+                    serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                    serviceConfig.getCertRefreshCheckDurationInMins());
         } else {
-            this.sslCtx = null;
+            this.sslCtxRefresher = null;
         }
     }
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
-        if (sslCtx != null) {
-            ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+        if (this.enableTls) {
+            ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
         } else {
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index f013865..e355672 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -20,15 +20,19 @@ package org.apache.pulsar.client.impl.auth;
 
 import java.security.KeyManagementException;
 import java.security.PrivateKey;
+import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.common.util.FileModifiedTimeUpdater;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AuthenticationDataTls implements AuthenticationDataProvider {
-
-    protected final X509Certificate[] certificates;
-    protected final PrivateKey privateKey;
+    protected X509Certificate[] tlsCertificates;
+    protected PrivateKey tlsPrivateKey;
+    protected FileModifiedTimeUpdater certFile, keyFile;
 
     public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException
{
         if (certFilePath == null) {
@@ -37,8 +41,10 @@ public class AuthenticationDataTls implements AuthenticationDataProvider
{
         if (keyFilePath == null) {
             throw new IllegalArgumentException("keyFilePath must not be null");
         }
-        certificates = SecurityUtility.loadCertificatesFromPemFile(certFilePath);
-        privateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFilePath);
+        this.certFile = new FileModifiedTimeUpdater(certFilePath);
+        this.keyFile = new FileModifiedTimeUpdater(keyFilePath);
+        this.tlsCertificates = SecurityUtility.loadCertificatesFromPemFile(certFilePath);
+        this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFilePath);
     }
 
     /*
@@ -51,13 +57,28 @@ public class AuthenticationDataTls implements AuthenticationDataProvider
{
     }
 
     @Override
-    public X509Certificate[] getTlsCertificates() {
-        return certificates;
+    public Certificate[] getTlsCertificates() {
+        if (this.certFile.checkAndRefresh()) {
+            try {
+                this.tlsCertificates = SecurityUtility.loadCertificatesFromPemFile(certFile.getFileName());
+            } catch (KeyManagementException e) {
+                LOG.error("Unable to refresh authData for cert {}: ", certFile.getFileName(),
e);
+            }
+        }
+        return this.tlsCertificates;
     }
 
     @Override
     public PrivateKey getTlsPrivateKey() {
-        return privateKey;
+        if (this.keyFile.checkAndRefresh()) {
+            try {
+                this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFile.getFileName());
+            } catch (KeyManagementException e) {
+                LOG.error("Unable to refresh authData for cert {}: ", keyFile.getFileName(),
e);
+            }
+        }
+        return this.tlsPrivateKey;
     }
 
+    private static final Logger LOG = LoggerFactory.getLogger(AuthenticationDataTls.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index a905054..2c1d979 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -49,7 +49,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
     
     public AuthenticationTls() {
     }
-
+    
     public AuthenticationTls(String certFilePath, String keyFilePath) {
         this.certFilePath = certFilePath;
         this.keyFilePath = keyFilePath;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
new file mode 100644
index 0000000..c9b8e5d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.cert.X509Certificate;
+
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.handler.ssl.SslContext;
+
+public class ClientSslContextRefresher {
+    private volatile SslContext sslContext;
+    private boolean tlsAllowInsecureConnection;
+    private String tlsTrustCertsFilePath;
+    private AuthenticationDataProvider authData;
+
+    public ClientSslContextRefresher(boolean allowInsecure, String trustCertsFilePath,
+            AuthenticationDataProvider authData) throws IOException, GeneralSecurityException
{
+        this.tlsAllowInsecureConnection = allowInsecure;
+        this.tlsTrustCertsFilePath = trustCertsFilePath;
+        this.authData = authData;
+
+        if (authData != null && authData.hasDataForTls()) {
+            this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+                    this.tlsTrustCertsFilePath, (X509Certificate[]) authData.getTlsCertificates(),
+                    authData.getTlsPrivateKey());
+        } else {
+            this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+                    this.tlsTrustCertsFilePath);
+        }
+    }
+
+    public SslContext get() {
+        if (authData != null && authData.hasDataForTls()) {
+            try {
+                this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+                        this.tlsTrustCertsFilePath, (X509Certificate[]) authData.getTlsCertificates(),
+                        authData.getTlsPrivateKey());
+            } catch (GeneralSecurityException | IOException e) {
+                LOG.error("Exception occured while trying to refresh sslContext: ", e);
+            }
+
+        }
+        return sslContext;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClientSslContextRefresher.class);
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
new file mode 100644
index 0000000..44424a3
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.Getter;
+
+public class FileModifiedTimeUpdater {
+    @Getter
+    String fileName;
+    @Getter
+    FileTime lastModifiedTime;
+
+    public FileModifiedTimeUpdater(String fileName) {
+        this.fileName = fileName;
+        this.lastModifiedTime = updateLastModifiedTime();
+    }
+
+    private FileTime updateLastModifiedTime() {
+        if (fileName != null) {
+            Path p = Paths.get(fileName);
+            try {
+                return Files.getLastModifiedTime(p);
+            } catch (IOException e) {
+                LOG.error("Unable to fetch lastModified time for file {}: ", fileName, e);
+            }
+        }
+        return null;
+    }
+
+    public boolean checkAndRefresh() {
+        FileTime newLastModifiedTime = updateLastModifiedTime();
+        if (newLastModifiedTime != null && !newLastModifiedTime.equals(lastModifiedTime))
{
+            this.lastModifiedTime = newLastModifiedTime;
+            return true;
+        }
+        return false;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileModifiedTimeUpdater.class);
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ServerSslContextRefresher.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ServerSslContextRefresher.java
new file mode 100644
index 0000000..a638454
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ServerSslContextRefresher.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.handler.ssl.SslContext;
+
+public class ServerSslContextRefresher {
+    private final boolean tlsAllowInsecureConnection;
+    private final FileModifiedTimeUpdater tlsTrustCertsFilePath, tlsCertificateFilePath,
tlsKeyFilePath;
+    private final Set<String> tlsCiphers;
+    private final Set<String> tlsProtocols;
+    private final boolean tlsRequireTrustedClientCertOnConnect;
+    private final long delayInMins;
+    private long nextRefreshTimeInMins;
+    private volatile SslContext sslContext;
+
+    public ServerSslContextRefresher(boolean allowInsecure, String trustCertsFilePath, String
certificateFilePath,
+            String keyFilePath, Set<String> ciphers, Set<String> protocols, boolean
requireTrustedClientCertOnConnect,
+            long delayInMins) throws SSLException, FileNotFoundException, GeneralSecurityException,
IOException {
+        this.tlsAllowInsecureConnection = allowInsecure;
+        this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
+        this.tlsCertificateFilePath = new FileModifiedTimeUpdater(certificateFilePath);
+        this.tlsKeyFilePath = new FileModifiedTimeUpdater(keyFilePath);
+        this.tlsCiphers = ciphers;
+        this.tlsProtocols = protocols;
+        this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
+        this.delayInMins = delayInMins;
+        this.nextRefreshTimeInMins = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
+ delayInMins;
+
+        buildSSLContext();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Certs will be refreshed every {} minutes", delayInMins);
+        }
+    }
+
+    public void buildSSLContext() throws SSLException, FileNotFoundException, GeneralSecurityException,
IOException {
+        this.sslContext = SecurityUtility.createNettySslContextForServer(tlsAllowInsecureConnection,
+                tlsTrustCertsFilePath.getFileName(), tlsCertificateFilePath.getFileName(),
tlsKeyFilePath.getFileName(),
+                tlsCiphers, tlsProtocols, tlsRequireTrustedClientCertOnConnect);
+    }
+
+    public synchronized SslContext get() {
+        long nowInSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+        if (nextRefreshTimeInMins > nowInSeconds) {
+            nextRefreshTimeInMins = nowInSeconds + delayInMins;
+            if (tlsTrustCertsFilePath.checkAndRefresh() || tlsCertificateFilePath.checkAndRefresh()
+                    || tlsKeyFilePath.checkAndRefresh()) {
+                try {
+                    buildSSLContext();
+                } catch (GeneralSecurityException | IOException e) {
+                    LOG.error("Execption while trying to refresh ssl Context: ", e);
+                }
+            }
+        }
+        return this.sslContext;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServerSslContextRefresher.class);
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FileModifiedTimeUpdaterTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FileModifiedTimeUpdaterTest.java
new file mode 100644
index 0000000..040263c
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FileModifiedTimeUpdaterTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.util;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
+
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class FileModifiedTimeUpdaterTest {
+    @DataProvider(name = "files")
+    Object[] getFiles() {
+        return new Object[] { "/tmp/file.ini", "/tmp/file.log", "/tmp/f3/notes.txt" };
+    }
+
+    @Test(dataProvider = "files")
+    public void testFileModified(String fileName) throws IOException, InterruptedException
{
+        Path path = Paths.get(fileName);
+        createFile(path);
+        FileModifiedTimeUpdater fileModifiedTimeUpdater = new FileModifiedTimeUpdater(fileName);
+        Thread.sleep(2000);
+        Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
+        FileTime fileTime = fileModifiedTimeUpdater.getLastModifiedTime();
+        Assert.assertTrue(fileModifiedTimeUpdater.checkAndRefresh());
+        Assert.assertNotEquals(fileTime, fileModifiedTimeUpdater.getLastModifiedTime());
+    }
+
+    public void createFile(Path path) throws IOException {
+        if (Files.notExists(path)) {
+            if (!Files.exists(path.getParent())) {
+                Files.createDirectories(path.getParent());
+            }
+            path.toFile().createNewFile();
+        }
+    }
+
+    @Test(dataProvider = "files")
+    public void testFileNotModified(String fileName) throws IOException {
+        Path path = Paths.get(fileName);
+        createFile(path);
+        FileModifiedTimeUpdater fileModifiedTimeUpdater = new FileModifiedTimeUpdater(fileName);
+        FileTime fileTime = fileModifiedTimeUpdater.getLastModifiedTime();
+        Assert.assertFalse(fileModifiedTimeUpdater.checkAndRefresh());
+        Assert.assertEquals(fileTime, fileModifiedTimeUpdater.getLastModifiedTime());
+    }
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index ee8531d..828c371 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.discovery.service;
 
-import com.google.common.base.Preconditions;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Closeable;
@@ -37,12 +36,15 @@ import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Getter;
 
 /**
  * Main discovery-service which starts component to serve incoming discovery-request over
binary-proto channel and
@@ -60,6 +62,7 @@ public class DiscoveryService implements Closeable {
     private ZooKeeperClientFactory zkClientFactory = null;
     private BrokerDiscoveryProvider discoveryProvider;
     private final EventLoopGroup acceptorGroup;
+    @Getter
     private final EventLoopGroup workerGroup;
     private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-discovery-acceptor");
     private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-discovery-io");
@@ -76,6 +79,7 @@ public class DiscoveryService implements Closeable {
 
     /**
      * Starts discovery service by initializing zookkeeper and server
+     * 
      * @throws Exception
      */
     public void start() throws Exception {
@@ -105,23 +109,23 @@ public class DiscoveryService implements Closeable {
 
         bootstrap.childHandler(new ServiceChannelInitializer(this, config, false));
         // Bind and start to accept incoming connections.
-        
-        Preconditions.checkArgument(config.getServicePort().isPresent() || config.getServicePortTls().isPresent(),

+
+        Preconditions.checkArgument(config.getServicePort().isPresent() || config.getServicePortTls().isPresent(),
                 "Either ServicePort or ServicePortTls should be configured.");
-        
+
         if (config.getServicePort().isPresent()) {
             // Bind and start to accept incoming connections.
             bootstrap.bind(config.getServicePort().get()).sync();
             LOG.info("Started Pulsar Discovery service on port {}", config.getServicePort());
         }
-        
+
         if (config.getServicePortTls().isPresent()) {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
             tlsBootstrap.childHandler(new ServiceChannelInitializer(this, config, true));
             tlsBootstrap.bind(config.getServicePortTls().get()).sync();
             LOG.info("Started Pulsar Discovery TLS service on port {}", config.getServicePortTls().get());
         }
-        
+
     }
 
     public ZooKeeperClientFactory getZooKeeperClientFactory() {
@@ -172,8 +176,8 @@ public class DiscoveryService implements Closeable {
 
     public String serviceUrlTls() {
         if (config.getServicePortTls().isPresent()) {
-            return new StringBuilder("pulsar+ssl://").append(host()).append(":").append(config.getServicePortTls().get())
-                    .toString();
+            return new StringBuilder("pulsar+ssl://").append(host()).append(":")
+                    .append(config.getServicePortTls().get()).toString();
         } else {
             return null;
         }
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
index d3d17c8..5c9da3a 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.discovery.service;
 
 import org.apache.pulsar.common.api.PulsarDecoder;
-import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.ServerSslContextRefresher;
 import org.apache.pulsar.discovery.service.server.ServiceConfig;
 
 import io.netty.channel.ChannelInitializer;
@@ -34,29 +34,33 @@ import io.netty.handler.ssl.SslContext;
 public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel> {
 
     public static final String TLS_HANDLER = "tls";
-    private final ServiceConfig serviceConfig;
     private final DiscoveryService discoveryService;
-    private final SslContext sslCtx;
+    private final boolean enableTls;
+    private final ServerSslContextRefresher sslCtxRefresher;
 
-    public ServiceChannelInitializer(DiscoveryService discoveryService, ServiceConfig serviceConfig,
boolean enableTLS)
+    public ServiceChannelInitializer(DiscoveryService discoveryService, ServiceConfig serviceConfig,
boolean e)
             throws Exception {
         super();
-        this.serviceConfig = serviceConfig;
         this.discoveryService = discoveryService;
-        if (enableTLS) {
-            this.sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(),
+        this.enableTls = e;
+        if (this.enableTls) {
+            sslCtxRefresher = new ServerSslContextRefresher(serviceConfig.isTlsAllowInsecureConnection(),
                     serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
                     serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
-                    serviceConfig.getTlsRequireTrustedClientCertOnConnect());
+                    serviceConfig.getTlsRequireTrustedClientCertOnConnect(),
+                    serviceConfig.getCertRefreshCheckDurationInMins());
         } else {
-            this.sslCtx = null;
+            this.sslCtxRefresher = null;
         }
     }
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
-        if (sslCtx != null) {
-            ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+        if (sslCtxRefresher != null && this.enableTls) {
+            SslContext sslContext = sslCtxRefresher.get();
+            if (sslContext != null) {
+                ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
+            }
         }
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize,
0, 4, 0, 4));
         ch.pipeline().addLast("handler", new ServerConnection(discoveryService));
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index bc7900e..5c5e161 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -78,6 +78,8 @@ public class ServiceConfig implements PulsarConfiguration {
     /***** --- TLS --- ****/
     @Deprecated
     private boolean tlsEnabled = false;
+    // run certificate checks every X Mins
+    private long certRefreshCheckDurationInMins = 0;
     // Path for the TLS certificate file
     private String tlsCertificateFilePath;
     // Path for the TLS private key file
@@ -278,6 +280,14 @@ public class ServiceConfig implements PulsarConfiguration {
         this.tlsProtocols = tlsProtocols;
     }
 
+    public long getCertRefreshCheckDurationInMins() {
+        return certRefreshCheckDurationInMins;
+    }
+
+    public void setCertRefreshCheckDurationInMins(long certRefreshCheckDurationInMins) {
+        this.certRefreshCheckDurationInMins = certRefreshCheckDurationInMins;
+    }
+
     public Set<String> getTlsCiphers() {
         return tlsCiphers;
     }
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index 24bcd6a..1e3f031 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -66,13 +66,8 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
 
-    private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceTest.class);
-
     private final static String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
     private final static String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index ee30656..252d6e9 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -39,14 +39,9 @@ import org.apache.pulsar.common.configuration.PropertiesContext;
 import org.apache.pulsar.common.configuration.PropertyContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 @Getter
 @Setter
 public class ProxyConfiguration implements PulsarConfiguration {
-    private final static Logger log = LoggerFactory.getLogger(ProxyConfiguration.class);
-
     @Category
     private static final String CATEGORY_SERVER = "Server";
     @Category
@@ -229,7 +224,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
 
     @Deprecated
     private boolean tlsEnabledInProxy = false;
-
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Time Interval in Mins between checks for Cert Refresh."
+    )
+    private long certRefreshCheckDurationInMins = 0;
     @FieldContext(
         category = CATEGORY_TLS,
         doc = "Path for the TLS certificate file"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index f87523e..bbccd26 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -20,12 +20,11 @@ package org.apache.pulsar.proxy.server;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
-import java.security.cert.X509Certificate;
-
-import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.api.PulsarDecoder;
-import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.ClientSslContextRefresher;
+import org.apache.pulsar.common.util.ServerSslContextRefresher;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
@@ -40,21 +39,24 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
 
     public static final String TLS_HANDLER = "tls";
     private final ProxyService proxyService;
-    private final SslContext serverSslCtx;
-    private final SslContext clientSslCtx;
+    private final ServerSslContextRefresher serverSslCtxRefresher;
+    private final ClientSslContextRefresher clientSslCtxRefresher;
+    private final boolean enableTls;
 
-    public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration serviceConfig,
boolean enableTLS)
+    public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration serviceConfig,
boolean enableTls)
             throws Exception {
         super();
         this.proxyService = proxyService;
+        this.enableTls = enableTls;
 
-        if (enableTLS) {
-            this.serverSslCtx = SecurityUtility.createNettySslContextForServer(true /* to
allow InsecureConnection */,
+        if (enableTls) {
+            serverSslCtxRefresher = new ServerSslContextRefresher(serviceConfig.isTlsAllowInsecureConnection(),
                     serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
                     serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
-                    serviceConfig.isTlsRequireTrustedClientCertOnConnect());
+                    serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                    serviceConfig.getCertRefreshCheckDurationInMins());
         } else {
-            this.serverSslCtx = null;
+            this.serverSslCtxRefresher = null;
         }
 
         if (serviceConfig.isTlsEnabledWithBroker()) {
@@ -65,27 +67,25 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
                         serviceConfig.getBrokerClientAuthenticationParameters()).getAuthData();
             }
 
-            if (authData != null && authData.hasDataForTls()) {
-                    this.clientSslCtx = SecurityUtility.createNettySslContextForClient(
-                            serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getBrokerClientTrustCertsFilePath(),
-                            (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey());
-                } else {
-                    this.clientSslCtx = SecurityUtility.createNettySslContextForClient(
-                            serviceConfig.isTlsAllowInsecureConnection(),
-                            serviceConfig.getBrokerClientTrustCertsFilePath());
-            }
+            clientSslCtxRefresher = new ClientSslContextRefresher(serviceConfig.isTlsAllowInsecureConnection(),
+                    serviceConfig.getBrokerClientTrustCertsFilePath(), authData);
+
         } else {
-            this.clientSslCtx = null;
+            this.clientSslCtxRefresher = null;
         }
     }
 
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
-        if (serverSslCtx != null) {
-            ch.pipeline().addLast(TLS_HANDLER, serverSslCtx.newHandler(ch.alloc()));
+        if (serverSslCtxRefresher != null && this.enableTls) {
+            SslContext sslContext = serverSslCtxRefresher.get();
+            if (sslContext != null) {
+                ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
+            }
         }
 
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize,
0, 4, 0, 4));
-        ch.pipeline().addLast("handler", new ProxyConnection(proxyService, clientSslCtx));
+        ch.pipeline().addLast("handler",
+                new ProxyConnection(proxyService, clientSslCtxRefresher == null ? null :
clientSslCtxRefresher.get()));
     }
 }


Mime
View raw message