asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Blow (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected chan...
Date Sun, 18 Feb 2018 19:50:47 GMT
Michael Blow has submitted this change and it was merged.

Change subject: [ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops
......................................................................


[ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Previously, when the http server channel drops unexpectedly, we
  did nothing.
- After this change, the http server will log the event and try
  to re-bind to the port until it either succeeds or
  server.stop() is invoked.

Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2382
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
---
M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
2 files changed, 147 insertions(+), 2 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  Michael Blow: Looks good to me, approved; Verified



diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 19436ab..8ce1d70 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -73,7 +73,8 @@
     private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
-    private Channel channel;
+    private volatile Thread recoveryThread;
+    private volatile Channel channel;
     private Throwable cause;
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
@@ -132,6 +133,14 @@
                 LOGGER.log(Level.ERROR, "Failure stopping an Http Server", e);
                 setFailed(e);
                 throw e;
+            }
+        }
+        // Should wait for the recovery thread outside synchronized block
+        Thread rt = recoveryThread;
+        if (rt != null) {
+            rt.join(TimeUnit.SECONDS.toMillis(5));
+            if (recoveryThread != null) {
+                LOGGER.log(Level.ERROR, "Failure stopping recovery thread of {}", this);
             }
         }
     }
@@ -209,6 +218,10 @@
          * Note that it doesn't work for the case where multiple paths map to a single IServlet
          */
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
+        channel = bind();
+    }
+
+    private Channel bind() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -216,10 +229,74 @@
                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
-        channel = b.bind(port).sync().channel();
+        Channel newChannel = b.bind(port).sync().channel();
+        newChannel.closeFuture().addListener(f -> {
+            // This listener is invoked from within a netty IO thread. Hence, we can never
block it
+            // For simplicity, we will submit the recovery task to a different thread
+            synchronized (lock) {
+                if (state != STARTED) {
+                    return;
+                }
+                LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery",
this);
+                triggerRecovery();
+            }
+        });
+        return newChannel;
+    }
+
+    private void triggerRecovery() {
+        Thread rt = recoveryThread;
+        if (rt != null) {
+            try {
+                rt.join();
+            } catch (InterruptedException e) {
+                LOGGER.log(Level.WARN, this + " recovery was interrupted", e);
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+        // try to revive the channel
+        recoveryThread = new Thread(this::recover);
+        recoveryThread.start();
+    }
+
+    public void recover() {
+        try {
+            synchronized (lock) {
+                while (state == STARTED) {
+                    try {
+                        channel = bind();
+                        break;
+                    } catch (InterruptedException e) {
+                        LOGGER.log(Level.WARN, this + " was interrupted while attempting
to revive server channel", e);
+                        setFailed(e);
+                        Thread.currentThread().interrupt();
+                    } catch (Throwable th) {
+                        // sleep for 5s
+                        LOGGER.log(Level.WARN, this + " failed server recovery attempt. "
+                                + "Sleeping for 5s before starting the next attempt", th);
+                        try {
+                            // Wait on lock to allow stop request to be executed
+                            lock.wait(TimeUnit.SECONDS.toMillis(5));
+                        } catch (InterruptedException e) {
+                            LOGGER.log(Level.WARN, this + " interrupted while attempting
to revive server channel", e);
+                            setFailed(e);
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            }
+        } finally {
+            recoveryThread = null;
+        }
     }
 
     protected void doStop() throws InterruptedException {
+        // stop recovery if it was ongoing
+        Thread rt = recoveryThread;
+        if (rt != null) {
+            rt.interrupt();
+        }
         // stop taking new requests
         executor.shutdown();
         try {
@@ -300,4 +377,10 @@
     public int getWorkQueueSize() {
         return workQueue.size();
     }
+
+    @Override
+    public String toString() {
+        return "{\"class\":\"" + getClass().getSimpleName() + "\",\"port\":" + port + ",\"state\":\""
+ getState()
+                + "\"}";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 9067586..298d2de 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -48,13 +48,18 @@
 import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.hyracks.http.servlet.ChattyServlet;
 import org.apache.hyracks.http.servlet.SleepyServlet;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import io.netty.channel.Channel;
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class HttpServerTest {
+    private static final Logger LOGGER = LogManager.getLogger();
     static final boolean PRINT_TO_CONSOLE = false;
     static final int PORT = 9898;
     static final String HOST = "localhost";
@@ -241,6 +246,63 @@
         }
     }
 
+    @Test
+    public void testServerRevival() throws Exception {
+        int numExecutors = 16;
+        int serverQueueSize = 16;
+        int numRequests = 1;
+        WebManager webMgr = new WebManager();
+        HttpServer server =
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors,
serverQueueSize);
+        ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            // send a request
+            request(numRequests);
+            for (Future<Void> thread : FUTURES) {
+                thread.get();
+            }
+            Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
+            // close the channel
+            Field channelField = server.getClass().getDeclaredField("channel");
+            channelField.setAccessible(true);
+            Field recoveryThreadField = server.getClass().getDeclaredField("recoveryThread");
+            recoveryThreadField.setAccessible(true);
+            Channel channel = (Channel) channelField.get(server);
+            channel.close();
+            Thread.sleep(1000);
+            final int sleeps = 10;
+            for (int i = 0; i < sleeps; i++) {
+                Thread thread = (Thread) recoveryThreadField.get(server);
+                if (thread == null) {
+                    break;
+                }
+                LOGGER.log(Level.WARN,
+                        "Attempt #" + (i + 1) + ". Recovery thread is not null and has id
" + thread.getId());
+                if (i == sleeps - 1) {
+                    throw new Exception("Http server recovery didn't complete after " + sleeps
+ "s");
+                }
+                Thread.sleep(1000);
+            }
+            for (int i = 0; i < sleeps; i++) {
+                request(1);
+                for (Future<Void> thread : FUTURES) {
+                    thread.get();
+                }
+                if (numRequests + 1 == SUCCESS_COUNT.get()) {
+                    break;
+                } else if (i == sleeps - 1) {
+                    throw new Exception(
+                            "Http server couldn't process requests correctly after recovery
for " + sleeps + "s");
+                }
+            }
+        } finally {
+            webMgr.stop();
+        }
+    }
+
     public static void setPrivateField(Object obj, String filedName, Object value) throws
Exception {
         Field f = obj.getClass().getDeclaredField(filedName);
         f.setAccessible(true);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2382
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd
Gerrit-PatchSet: 11
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Michael Blow <mblow@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message