pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2297: fix Test: function-tls test stops server gracefully
Date Sun, 05 Aug 2018 23:47:21 GMT
sijie closed pull request #2297: fix Test: function-tls test stops server gracefully
URL: https://github.com/apache/incubator-pulsar/pull/2297
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 8f8b7ffd05..c57a8a0e59 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -74,7 +74,6 @@
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
     String workerId;
     WorkerServer workerServer;
-    Thread serverThread;
     PulsarAdmin functionAdmin;
     private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
     private final int workerServicePort = PortManager.nextFreePort();
@@ -128,8 +127,7 @@ void setup(Method method) throws Exception {
         when(functionsWorkerService.getFunctionMetaDataManager()).thenReturn(dataManager);
 
         workerServer = new WorkerServer(functionsWorkerService);
-        serverThread = new Thread(workerServer, workerServer.getThreadName());
-        serverThread.start();
+        workerServer.start();
         Thread.sleep(2000);
         String functionTlsUrl = String.format("https://%s:%s",
                 functionsWorkerService.getWorkerConfig().getWorkerHostname(), workerServicePortTls);
@@ -153,14 +151,6 @@ void shutdown() throws Exception {
         functionAdmin.close();
         bkEnsemble.stop();
         workerServer.stop();
-        if (null != serverThread) {
-            serverThread.interrupt();
-            try {
-                serverThread.join();
-            } catch (InterruptedException e) {
-                log.warn("Worker server thread is interrupted", e);
-            }
-        }
         functionsWorkerService.stop();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 6e87b94cb4..cb73eaa16f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -38,7 +38,7 @@
 
     private final WorkerConfig workerConfig;
     private final WorkerService workerService;
-    private Thread serverThread;
+    private WorkerServer server;
 
     public Worker(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
@@ -57,15 +57,13 @@ protected void doStart() {
         }
     }
 
-    protected void doStartImpl() throws InterruptedException, IOException, PulsarAdminException
{
+    protected void doStartImpl() throws Exception {
         URI dlogUri = initialize(this.workerConfig);
 
         workerService.start(dlogUri);
-        WorkerServer server = new WorkerServer(workerService);
-        this.serverThread = new Thread(server, server.getThreadName());
-
+        this.server = new WorkerServer(workerService);
+        this.server.start();
         log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort());
-        this.serverThread.start();
     }
 
     private static URI initialize(WorkerConfig workerConfig)
@@ -150,13 +148,8 @@ private static URI initialize(WorkerConfig workerConfig)
 
     @Override
     protected void doStop() {
-        if (null != serverThread) {
-            serverThread.interrupt();
-            try {
-                serverThread.join();
-            } catch (InterruptedException e) {
-                log.warn("Worker server thread is interrupted", e);
-            }
+        if (null != this.server) {
+            this.server.stop();
         }
         workerService.stop();
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index f6e148e719..79b9093e7b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -53,7 +53,7 @@
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 @Slf4j
-public class WorkerServer implements Runnable {
+public class WorkerServer {
 
     private final WorkerConfig workerConfig;
     private final WorkerService workerService;
@@ -76,42 +76,41 @@ public WorkerServer(WorkerService workerService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
         this.webServerExecutor = Executors.newFixedThreadPool(NUM_ACCEPTORS, new DefaultThreadFactory("function-web"));
+        init();
     }
 
-    @Override
-    public void run() {
+    public void start() throws Exception {
+        server.start();
+        log.info("Worker Server started at {}", server.getURI());
+    }
+    
+    private void init() {
         server = new Server(new ExecutorThreadPool(webServerExecutor));
 
-        
         List<ServerConnector> connectors = new ArrayList<>();
         ServerConnector connector = new ServerConnector(server, 1, 1);
         connector.setPort(this.workerConfig.getWorkerPort());
         connector.setHost(this.workerConfig.getWorkerHostname());
         connectors.add(connector);
-        
+
         List<Handler> handlers = new ArrayList<>(3);
-        handlers.add(newServletContextHandler("/admin",
-                new ResourceConfig(Resources.getApiResources()), workerService));
-        handlers.add(newServletContextHandler("/admin/v2",
-                new ResourceConfig(Resources.getApiResources()), workerService));
-        handlers.add(newServletContextHandler("/",
-                new ResourceConfig(Resources.getRootResources()), workerService));
+        handlers.add(
+                newServletContextHandler("/admin", new ResourceConfig(Resources.getApiResources()),
workerService));
+        handlers.add(
+                newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiResources()),
workerService));
+        handlers.add(newServletContextHandler("/", new ResourceConfig(Resources.getRootResources()),
workerService));
 
         ContextHandlerCollection contexts = new ContextHandlerCollection();
         contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
         HandlerCollection handlerCollection = new HandlerCollection();
-        handlerCollection.setHandlers(new Handler[] {
-            contexts, new DefaultHandler()
-        });
+        handlerCollection.setHandlers(new Handler[] { contexts, new DefaultHandler() });
         server.setHandler(handlerCollection);
-        
+
         if (this.workerConfig.isTlsEnabled()) {
             try {
                 SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
-                        this.workerConfig.isTlsAllowInsecureConnection(),
-                        this.workerConfig.getTlsTrustCertsFilePath(),
-                        this.workerConfig.getTlsCertificateFilePath(),
-                        this.workerConfig.getTlsKeyFilePath(),
+                        this.workerConfig.isTlsAllowInsecureConnection(), this.workerConfig.getTlsTrustCertsFilePath(),
+                        this.workerConfig.getTlsCertificateFilePath(), this.workerConfig.getTlsKeyFilePath(),
                         this.workerConfig.isTlsRequireTrustedClientCertOnConnect());
                 ServerConnector tlsConnector = new ServerConnector(server, 1, 1, sslCtxFactory);
                 tlsConnector.setPort(this.workerConfig.getWorkerPortTls());
@@ -125,24 +124,6 @@ public void run() {
         // Limit number of concurrent HTTP connections to avoid getting out of file descriptors
         connectors.forEach(c -> c.setAcceptQueueSize(MAX_CONCURRENT_REQUESTS / connectors.size()));
         server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
-
-        try {
-            server.start();
-
-            log.info("Worker Server started at {}", server.getURI());
-
-            server.join();
-        } catch (Exception ex) {
-            log.error("ex: {}", ex, ex);
-            final String message = getErrorMessage(server, this.workerConfig.getWorkerPort(),
ex);
-            log.error(message);
-        } finally {
-            server.destroy();
-        }
-    }
-
-    public String getThreadName() {
-        return "worker-server-thread-" + this.workerConfig.getWorkerId();
     }
 
     public static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig
config, WorkerService workerService) {
@@ -167,11 +148,14 @@ public static ServletContextHandler newServletContextHandler(String
contextPath,
     public void stop() {
         if (this.server != null) {
             try {
-                this.server.stop();
+                this.server.destroy();
             } catch (Exception e) {
                 log.error("Failed to stop function web-server ", e);
             }
         }
+        if (this.webServerExecutor != null && !this.webServerExecutor.isShutdown())
{
+            this.webServerExecutor.shutdown();
+        }
     }
     
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message