zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject [zeppelin] branch master updated: [ZEPPELIN-4089] handle ipython kernel crash
Date Sun, 31 Mar 2019 12:54:32 GMT
This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 30c271b  [ZEPPELIN-4089] handle ipython kernel crash
30c271b is described below

commit 30c271b9bf1a9667f6991216a5a8dc32c0be996b
Author: marc hurabielle <marc.hurabielle@gmail.com>
AuthorDate: Sun Mar 24 17:43:22 2019 +0900

    [ZEPPELIN-4089] handle ipython kernel crash
    
    ### What is this PR for?
    
    The pr is the last fix of the `ZEPPELIN-4089` It will aims to notify user when ipython
kernel crash. Indeed, right now, execution will be stuck forever when ipython kernel crash.
    
    ### What type of PR is it?
    Bug Fix
    
    ### Todos
    * [x] - logic to check kernel status
    * [ ] - decide what to do when we spot the failure
    * [x] - test
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-4089
    
    ### How should this be tested?
    * First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed behavior
    * Outline any manual steps to test the PR here.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: marc hurabielle <marc.hurabielle@gmail.com>
    
    Closes #3339 from AyWa/fix/kernel-crash and squashes the following commits:
    
    6f4910c67 [marc hurabielle] fix lint
    73424d17d [marc hurabielle] Revert "example for kernel die and request stuck"
    f37d5f95d [marc hurabielle] add test for kernel die
    c9ec0335c [marc hurabielle] example for kernel die and request stuck
    5fe84dfef [marc hurabielle] handle kernel crash
    
    (cherry picked from commit ef5e173d3aa1e708cc8994a3f82a25d357ca7005)
    Signed-off-by: Jeff Zhang <zjffdu@apache.org>
---
 .../main/resources/grpc/python/ipython_server.py   | 44 ++++++++++++++++------
 .../zeppelin/python/IPythonInterpreterTest.java    | 44 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 12 deletions(-)

diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py
index 4b68efd..36e0a13 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -16,6 +16,7 @@
 from __future__ import print_function
 
 import jupyter_client
+import os
 import sys
 import threading
 import time
@@ -25,8 +26,6 @@ import grpc
 import ipython_pb2
 import ipython_pb2_grpc
 
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
 is_py2 = sys.version[0] == '2'
 if is_py2:
     import Queue as queue
@@ -34,8 +33,6 @@ else:
     import queue as queue
 
 
-TIMEOUT = 60*60*24*365*100  # 100 years
-
 class IPython(ipython_pb2_grpc.IPythonServicer):
 
     def __init__(self, server):
@@ -73,13 +70,16 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         def execute_worker():
             reply = self._kc.execute_interactive(request.code,
                                             output_hook=_output_hook,
-                                            timeout=TIMEOUT)
+                                            timeout=None)
             payload_reply.append(reply)
 
         t = threading.Thread(name="ConsumerThread", target=execute_worker)
         t.start()
 
-        while t.is_alive():
+        # We want to ensure that the kernel is alive because in case of OOM or other errors
+        # Execution might be stuck there:
+        # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
+        while t.is_alive() and self.isKernelAlive():
             while not stdout_queue.empty():
                 output = stdout_queue.get()
                 yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -96,6 +96,14 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
                                                   type=ipython_pb2.IMAGE,
                                                   output=output)
 
+        # if kernel is not alive (should be same as thread is still alive), means that we
face
+        # an unexpected issue.
+        if not self.isKernelAlive() or t.is_alive():
+            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+                                                type=ipython_pb2.TEXT,
+                                                output="Ipython kernel has been stopped.
Please check logs. It might be because of an out of memory issue.")
+            return
+
         while not stdout_queue.empty():
             output = stdout_queue.get()
             yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -127,15 +135,21 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         return ipython_pb2.CancelResponse()
 
     def complete(self, request, context):
-        reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT)
+        reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None)
         return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
 
     def status(self, request, context):
         return ipython_pb2.StatusResponse(status = self._status)
 
+    def isKernelAlive(self):
+        return self._km.is_alive()
+
+    def terminate(self):
+        self._km.shutdown_kernel()
+
     def stop(self, request, context):
-        self._server.stop(0)
-        sys.exit(0)
+        self.terminate()
+        return ipython_pb2.StopResponse()
 
 
 def serve(port):
@@ -146,10 +160,16 @@ def serve(port):
     server.start()
     ipython.start()
     try:
-        while True:
-            time.sleep(_ONE_DAY_IN_SECONDS)
+        while ipython.isKernelAlive():
+            time.sleep(5)
     except KeyboardInterrupt:
-        server.stop(0)
+        print("interrupted")
+    finally:
+        print("shutdown")
+        # we let 2 sc for all request to be complete
+        server.stop(2)
+        ipython.terminate()
+        os._exit(0)
 
 if __name__ == '__main__':
     serve(sys.argv[1])
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 28e6270..1e4a709 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.junit.Test;
@@ -66,6 +67,49 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
   }
 
   @Test
+  public void testIpythonKernelCrash_shouldNotHangExecution()
+      throws InterpreterException, IOException {
+    // The goal of this test is to ensure that we handle case when the kernel die.
+    // In order to do so, we will kill the kernel process from the python code.
+    // A real example of that could be a out of memory by the code we execute.
+    String codeDep = "!pip install psutil";
+    String codeFindPID = "from os import getpid\n"
+        + "import psutil\n"
+        + "pids = psutil.pids()\n"
+        + "my_pid = getpid()\n"
+        + "pidToKill = []\n"
+        + "for pid in pids:\n"
+        + "    try:\n"
+        + "        p = psutil.Process(pid)\n"
+        + "        cmd = p.cmdline()\n"
+        + "        for arg in cmd:\n"
+        + "            if arg.count('ipykernel'):\n"
+        + "                pidToKill.append(pid)\n"
+        + "    except:\n"
+        + "        continue\n"
+        + "len(pidToKill)";
+    String codeKillKernel = "from os import kill\n"
+        + "import signal\n"
+        + "for pid in pidToKill:\n"
+        + "    kill(pid, signal.SIGKILL)";
+    InterpreterContext context = getInterpreterContext();
+    InterpreterResult result = interpreter.interpret(codeDep, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    context = getInterpreterContext();
+    result = interpreter.interpret(codeFindPID, context);
+    assertEquals(Code.SUCCESS, result.code());
+    InterpreterResultMessage output = context.out.toInterpreterResultMessage().get(0);
+    int numberOfPID = Integer.parseInt(output.getData());
+    assertTrue(numberOfPID > 0);
+    context = getInterpreterContext();
+    result = interpreter.interpret(codeKillKernel, context);
+    assertEquals(Code.ERROR, result.code());
+    output = context.out.toInterpreterResultMessage().get(0);
+    assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs.
"
+        + "It might be because of an out of memory issue."));
+  }
+
+  @Test
   public void testIPythonAdvancedFeatures()
       throws InterpreterException, InterruptedException, IOException {
     // ipython help


Mime
View raw message