asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [NO ISSUE][OTH] Interrupt Http Executor on Client Close
Date Fri, 25 Aug 2017 22:39:27 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1972

Change subject: [NO ISSUE][OTH] Interrupt Http Executor on Client Close
......................................................................

[NO ISSUE][OTH] Interrupt Http Executor on Client Close

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Previously, we didn't know that an Http client closed the
  connection until we try to write and find that the channel has
  been closed.
- After this change, the moment the channel is closed, the http
  task is interrupted.

Change-Id: I42f1857c0158af6f447282cab8fbd600767b08d5
---
M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
A hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
4 files changed, 187 insertions(+), 92 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/72/1972/1

diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 46b693b..154cff6 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.http.server;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -91,12 +92,13 @@
             return;
         }
         handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
-        submit();
+        submit(ctx);
     }
 
-    private void submit() throws IOException {
+    private void submit(ChannelHandlerContext ctx) throws IOException {
         try {
-            server.getExecutor().submit(handler);
+            Future<Void> task = server.getExecutor().submit(handler);
+            ctx.channel().closeFuture().addListener(future -> task.cancel(true));
         } catch (RejectedExecutionException e) { // NOSONAR
             LOGGER.log(Level.WARNING, "Request rejected by server executor service. " + e.getMessage());
             handler.reject();
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
index 6bfa0cf..2a5a0a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
@@ -31,6 +31,7 @@
 public class SleepyServlet extends AbstractServlet {
 
     private volatile boolean sleep = true;
+    private int numSlept = 0;
 
     public SleepyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
         super(ctx, paths);
@@ -46,8 +47,11 @@
         response.setStatus(HttpResponseStatus.OK);
         if (sleep) {
             synchronized (this) {
-                while (sleep) {
-                    this.wait();
+                if (sleep) {
+                    incrementSleptCount();
+                    while (sleep) {
+                        this.wait();
+                    }
                 }
             }
         }
@@ -55,6 +59,15 @@
         response.outputStream().write("I am playing hard to get".getBytes(StandardCharsets.UTF_8));
     }
 
+    private void incrementSleptCount() {
+        numSlept++;
+        notifyAll();
+    }
+
+    public int getNumSlept() {
+        return numSlept;
+    }
+
     public synchronized void wakeUp() {
         sleep = false;
         notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
new file mode 100644
index 0000000..17f6f9a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class HttpRequestTask implements Callable<Void> {
+
+    protected final HttpUriRequest request;
+
+    protected HttpRequestTask() throws URISyntaxException {
+        request = post(null);
+    }
+
+    @Override
+    public Void call() throws Exception {
+        try {
+            HttpResponse response = executeHttpRequest(request);
+            if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code())
{
+                HttpServerTest.SUCCESS_COUNT.incrementAndGet();
+            } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE.code())
{
+                HttpServerTest.UNAVAILABLE_COUNT.incrementAndGet();
+            } else {
+                HttpServerTest.OTHER_COUNT.incrementAndGet();
+            }
+            InputStream in = response.getEntity().getContent();
+            if (HttpServerTest.PRINT_TO_CONSOLE) {
+                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+                String line = null;
+                while ((line = reader.readLine()) != null) {
+                    System.out.println(line);
+                }
+            }
+            IOUtils.closeQuietly(in);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+        return null;
+    }
+
+    protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
+        HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
+        try {
+            return client.execute(method);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    protected HttpUriRequest get(String query) throws URISyntaxException {
+        URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT,
HttpServerTest.PATH,
+                query, null);
+        RequestBuilder builder = RequestBuilder.get(uri);
+        builder.setCharset(StandardCharsets.UTF_8);
+        return builder.build();
+    }
+
+    protected HttpUriRequest post(String query) throws URISyntaxException {
+        URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT,
HttpServerTest.PATH,
+                query, null);
+        RequestBuilder builder = RequestBuilder.post(uri);
+        StringBuilder str = new StringBuilder();
+        for (int i = 0; i < 32; i++) {
+            str.append("This is a string statement that will be ignored");
+            str.append('\n');
+        }
+        String statement = str.toString();
+        builder.setHeader("Content-type", "application/x-www-form-urlencoded");
+        builder.addParameter("statement", statement);
+        builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
+        builder.setCharset(StandardCharsets.UTF_8);
+        return builder.build();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 66d1b77..ef26534 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -19,15 +19,12 @@
 package org.apache.hyracks.http.test;
 
 import java.io.BufferedReader;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.Socket;
-import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -35,14 +32,6 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.WebManager;
 import org.apache.hyracks.http.servlet.ChattyServlet;
@@ -63,6 +52,7 @@
     static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
     static final AtomicInteger OTHER_COUNT = new AtomicInteger();
     static final List<Future<Void>> FUTURES = new ArrayList<>();
+    static final List<HttpRequestTask> TASKS = new ArrayList<>();
     static final ExecutorService executor = Executors.newCachedThreadPool();
 
     @Before
@@ -70,25 +60,26 @@
         SUCCESS_COUNT.set(0);
         UNAVAILABLE_COUNT.set(0);
         OTHER_COUNT.set(0);
+        FUTURES.clear();
+        TASKS.clear();
     }
 
     @Test
     public void testOverloadingServer() throws Exception {
         WebManager webMgr = new WebManager();
         int numExecutors = 16;
-        int serverQueueSize = 16;
+        int queueSize = 16;
         int numRequests = 48;
-        HttpServer server =
-                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors,
serverQueueSize);
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, queueSize);
         SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
         server.addServlet(servlet);
         webMgr.add(server);
         webMgr.start();
-        int expectedSuccess = numExecutors + serverQueueSize;
+        int expectedSuccess = numExecutors + queueSize;
         int expectedUnavailable = numRequests - expectedSuccess;
         try {
             request(expectedSuccess);
-            waitTillQueued(server, serverQueueSize);
+            waitTillQueued(server, queueSize);
             ArrayList<Future<Void>> successSet = started();
             request(expectedUnavailable);
             ArrayList<Future<Void>> rejectedSet = started();
@@ -111,17 +102,55 @@
     }
 
     private void waitTillQueued(HttpServer server, int expectedQueued) throws Exception {
-        int maxAttempts = 5;
+        int maxAttempts = 10;
         int attempt = 0;
         int queued = server.getWorkQueueSize();
         while (queued != expectedQueued) {
             attempt++;
             if (attempt > maxAttempts) {
                 throw new Exception("Number of queued requests (" + queued + ") didn't match
the expected number ("
-                        + expectedQueued + ")");
+                        + expectedQueued + ") during " + maxAttempts + "s");
             }
             Thread.sleep(1000); // NOSONAR polling is the clean way
             queued = server.getWorkQueueSize();
+        }
+    }
+
+    @Test
+    public void testInterruptOnClientClose() throws Exception {
+        WebManager webMgr = new WebManager();
+        int numExecutors = 1;
+        int queueSize = 1;
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, queueSize);
+        SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            request(1);
+            synchronized (servlet) {
+                while (servlet.getNumSlept() == 0) {
+                    servlet.wait();
+                }
+            }
+            request(1);
+            waitTillQueued(server, 1);
+            FUTURES.remove(0);
+            HttpRequestTask request = TASKS.remove(0);
+            request.request.abort();
+            waitTillQueued(server, 0);
+            synchronized (servlet) {
+                while (servlet.getNumSlept() == 1) {
+                    servlet.wait();
+                }
+            }
+            servlet.wakeUp();
+            for (Future<Void> f : FUTURES) {
+                f.get();
+            }
+            FUTURES.clear();
+        } finally {
+            webMgr.stop();
         }
     }
 
@@ -130,17 +159,16 @@
         WebManager webMgr = new WebManager();
         int numRequests = 64;
         int numExecutors = 2;
-        int serverQueueSize = 2;
+        int queueSize = 2;
         int numPatches = 60;
-        HttpServer server =
-                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors,
serverQueueSize);
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, queueSize);
         SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
         server.addServlet(servlet);
         webMgr.add(server);
         webMgr.start();
-        request(numExecutors + serverQueueSize);
+        request(numExecutors + queueSize);
         ArrayList<Future<Void>> stuck = started();
-        waitTillQueued(server, serverQueueSize);
+        waitTillQueued(server, queueSize);
         try {
             try {
                 for (int i = 0; i < numPatches; i++) {
@@ -166,6 +194,7 @@
     private ArrayList<Future<Void>> started() {
         ArrayList<Future<Void>> started = new ArrayList<>(FUTURES);
         FUTURES.clear();
+        TASKS.clear();
         return started;
     }
 
@@ -198,10 +227,9 @@
     @Test
     public void testMalformedString() throws Exception {
         int numExecutors = 16;
-        int serverQueueSize = 16;
+        int queueSize = 16;
         WebManager webMgr = new WebManager();
-        HttpServer server =
-                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors,
serverQueueSize);
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, queueSize);
         SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
         server.addServlet(servlet);
         webMgr.add(server);
@@ -237,69 +265,12 @@
         f.set(obj, value);
     }
 
-    private void request(int count) {
+    private void request(int count) throws URISyntaxException {
         for (int i = 0; i < count; i++) {
-            Future<Void> next = executor.submit(() -> {
-                try {
-                    HttpUriRequest request = post(null);
-                    HttpResponse response = executeHttpRequest(request);
-                    if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code())
{
-                        SUCCESS_COUNT.incrementAndGet();
-                    } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE
-                            .code()) {
-                        UNAVAILABLE_COUNT.incrementAndGet();
-                    } else {
-                        OTHER_COUNT.incrementAndGet();
-                    }
-                    InputStream in = response.getEntity().getContent();
-                    if (PRINT_TO_CONSOLE) {
-                        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-                        String line = null;
-                        while ((line = reader.readLine()) != null) {
-                            System.out.println(line);
-                        }
-                    }
-                    IOUtils.closeQuietly(in);
-                } catch (Throwable th) {
-                    th.printStackTrace();
-                    throw th;
-                }
-                return null;
-            });
+            HttpRequestTask requestTask = new HttpRequestTask();
+            Future<Void> next = executor.submit(requestTask);
             FUTURES.add(next);
+            TASKS.add(requestTask);
         }
-    }
-
-    protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
-        HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
-        try {
-            return client.execute(method);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    protected HttpUriRequest get(String query) throws URISyntaxException {
-        URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
-        RequestBuilder builder = RequestBuilder.get(uri);
-        builder.setCharset(StandardCharsets.UTF_8);
-        return builder.build();
-    }
-
-    protected HttpUriRequest post(String query) throws URISyntaxException {
-        URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
-        RequestBuilder builder = RequestBuilder.post(uri);
-        StringBuilder str = new StringBuilder();
-        for (int i = 0; i < 32; i++) {
-            str.append("This is a string statement that will be ignored");
-            str.append('\n');
-        }
-        String statement = str.toString();
-        builder.setHeader("Content-type", "application/x-www-form-urlencoded");
-        builder.addParameter("statement", statement);
-        builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
-        builder.setCharset(StandardCharsets.UTF_8);
-        return builder.build();
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1972
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I42f1857c0158af6f447282cab8fbd600767b08d5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message