zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kha...@apache.org
Subject zeppelin git commit: [ZEPPELIN-2318][branch-0.7] Fix proxy configuration for http client of zeppelinhub storage layer
Date Thu, 20 Apr 2017 06:03:31 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.7 69e70d515 -> 4dfb81500


[ZEPPELIN-2318][branch-0.7] Fix proxy configuration for http client of zeppelinhub storage
layer

### What is this PR for?
this is to resolve this issue for `branch-0.7` since original PR #2198 had conflicts with
the branch. for more details check the original pr

### What type of PR is it?
Bug Fix | Improvement

### Todos
* [x] - Task

### What is the Jira issue?
[ZEPPELIN-2318](https://issues.apache.org/jira/browse/ZEPPELIN-2318)

### How should this be tested?
check in #2198

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Khalid Huseynov <khalidhnv@gmail.com>
Author: LeiWang <wanglei6744@163.com>

Closes #2247 from khalidhuseynov/fix/branch-0.7-ZEPPELIN-2318 and squashes the following commits:

ec23c958a [Khalid Huseynov] edge case logs from error -> warn
e4db79ca0 [Khalid Huseynov] fix log
d8efb462f [Khalid Huseynov] fix websocket timing
e6622398d [Khalid Huseynov] add ssl setup
8f8109eaa [Khalid Huseynov] add close routine
7f3cd5040 [Khalid Huseynov] jetty client relay to asyncclient when proxy on
a65f73556 [Khalid Huseynov] add proxy client with asynclient library
6add668ff [Khalid Huseynov] add dependency in pom, resolve coflict
d4cacadaf [LeiWang] fix bugs for timer saver


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/4dfb8150
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/4dfb8150
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/4dfb8150

Branch: refs/heads/branch-0.7
Commit: 4dfb815004c3d5a611c3ce16c6f6050813e24f38
Parents: 69e70d5
Author: Khalid Huseynov <khalidhnv@gmail.com>
Authored: Wed Mar 29 16:54:38 2017 +0900
Committer: Khalid Huseynov <khalidhnv@gmail.com>
Committed: Thu Apr 20 15:04:15 2017 +0900

----------------------------------------------------------------------
 pom.xml                                         |   7 +
 zeppelin-zengine/pom.xml                        |   5 +
 .../repo/zeppelinhub/ZeppelinHubRepo.java       |   3 +-
 .../repo/zeppelinhub/rest/HttpProxyClient.java  | 212 +++++++++++++++++++
 .../rest/ZeppelinhubRestApiHandler.java         | 114 +++++++---
 .../zeppelinhub/websocket/ZeppelinClient.java   |  17 +-
 6 files changed, 320 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5010675..4df0094 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,7 @@
     <jetty.version>9.2.15.v20160210</jetty.version>
     <httpcomponents.core.version>4.3.3</httpcomponents.core.version>
     <httpcomponents.client.version>4.3.6</httpcomponents.client.version>
+    <httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version>
     <commons.lang.version>2.5</commons.lang.version>
     <commons.configuration.version>1.9</commons.configuration.version>
     <commons.codec.version>1.5</commons.codec.version>
@@ -174,6 +175,12 @@
       </dependency>
 
       <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpasyncclient</artifactId>
+        <version>${httpcomponents.asyncclient.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>commons-lang</groupId>
         <artifactId>commons-lang</artifactId>
         <version>${commons.lang.version}</version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index efe1c8c..730ada8 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -114,6 +114,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpasyncclient</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.amazonaws</groupId>
       <artifactId>aws-java-sdk-s3</artifactId>
       <version>${aws.sdk.s3.version}</version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
index 2f33f6f..cd94180 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -217,7 +217,8 @@ public class ZeppelinHubRepo implements NotebookRepo {
 
   @Override
   public void close() {
-    //websocketClient.stop();
+    websocketClient.stop();
+    restApiClient.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java
new file mode 100644
index 0000000..690a8b6
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ssl.BrowserCompatHostnameVerifier;
+import org.apache.http.conn.ssl.SSLContexts;
+import org.apache.http.conn.ssl.X509HostnameVerifier;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOReactorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is http client class for the case of proxy usage
+ * jetty-client has issue with https over proxy for 9.2.x
+ *   https://github.com/eclipse/jetty.project/issues/408
+ *   https://github.com/eclipse/jetty.project/issues/827
+ *    
+ */
+
+public class HttpProxyClient {
+  private static final Logger LOG = LoggerFactory.getLogger(HttpProxyClient.class);
+  public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
+  
+  private CloseableHttpAsyncClient client;
+  private URI proxyUri;
+  
+  public static HttpProxyClient newInstance(URI proxyUri) {
+    return new HttpProxyClient(proxyUri);
+  }
+  
+  private HttpProxyClient(URI uri) {
+    this.proxyUri = uri;
+    
+    client = getAsyncProxyHttpClient(proxyUri);
+    client.start();
+  }
+  
+  public URI getProxyUri() {
+    return proxyUri;
+  }
+  
+  private CloseableHttpAsyncClient getAsyncProxyHttpClient(URI proxyUri) {
+    LOG.info("Creating async proxy http client");
+    PoolingNHttpClientConnectionManager cm = getAsyncConnectionManager();
+    HttpHost proxy = new HttpHost(proxyUri.getHost(), proxyUri.getPort());
+    
+    HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
+    if (cm != null) {
+      clientBuilder = clientBuilder.setConnectionManager(cm);
+    }
+
+    if (proxy != null) {
+      clientBuilder = clientBuilder.setProxy(proxy);
+    }
+    clientBuilder = setRedirects(clientBuilder);
+    return clientBuilder.build();
+  }
+  
+  private PoolingNHttpClientConnectionManager getAsyncConnectionManager() {
+    ConnectingIOReactor ioReactor = null;
+    PoolingNHttpClientConnectionManager cm = null;
+    try {
+      ioReactor = new DefaultConnectingIOReactor();
+      // ssl setup
+      SSLContext sslcontext = SSLContexts.createSystemDefault();
+      X509HostnameVerifier hostnameVerifier = new BrowserCompatHostnameVerifier();
+      @SuppressWarnings("deprecation")
+      Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder
+          .<SchemeIOSessionStrategy>create()
+          .register("http", NoopIOSessionStrategy.INSTANCE)
+          .register("https", new SSLIOSessionStrategy(sslcontext, hostnameVerifier))
+          .build();
+
+      cm = new PoolingNHttpClientConnectionManager(ioReactor, sessionStrategyRegistry);
+    } catch (IOReactorException e) {
+      LOG.error("Couldn't initialize multi-threaded async client ", e);
+      return null;
+    }
+    return cm;
+  }
+  
+  private HttpAsyncClientBuilder setRedirects(HttpAsyncClientBuilder clientBuilder) {
+    clientBuilder.setRedirectStrategy(new DefaultRedirectStrategy() {
+      /** Redirectable methods. */
+      private String[] REDIRECT_METHODS = new String[] { 
+        HttpGet.METHOD_NAME, HttpPost.METHOD_NAME, 
+        HttpPut.METHOD_NAME, HttpDelete.METHOD_NAME, HttpHead.METHOD_NAME 
+      };
+
+      @Override
+      protected boolean isRedirectable(String method) {
+        for (String m : REDIRECT_METHODS) {
+          if (m.equalsIgnoreCase(method)) {
+            return true;
+          }
+        }
+        return false;
+      }
+    });
+    return clientBuilder;
+  }
+  
+  public String sendToZeppelinHub(HttpRequestBase request,
+      boolean withResponse) throws IOException {
+    return withResponse ?
+        sendAndGetResponse(request) : sendWithoutResponseBody(request);
+  }
+  
+
+  private String sendWithoutResponseBody(HttpRequestBase request) throws IOException {
+    FutureCallback<HttpResponse> callback = getCallback(request);
+    client.execute(request, callback);
+    return StringUtils.EMPTY;
+  }
+  
+  private String sendAndGetResponse(HttpRequestBase request) throws IOException {
+    String data = StringUtils.EMPTY;
+    try {
+      HttpResponse response = client.execute(request, null).get(30, TimeUnit.SECONDS);
+      int code = response.getStatusLine().getStatusCode();
+      if (code == 200) {
+        try (InputStream responseContent = response.getEntity().getContent()) {
+          data = IOUtils.toString(responseContent, "UTF-8");
+        }
+      } else {
+        LOG.error("ZeppelinHub {} {} returned with status {} ", request.getMethod(),
+            request.getURI(), code);
+        throw new IOException("Cannot perform " + request.getMethod() + " request to ZeppelinHub");
+      }
+    } catch (InterruptedException | ExecutionException | TimeoutException
+        | NullPointerException e) {
+      throw new IOException(e);
+    }
+    return data;
+  }
+  
+  private FutureCallback<HttpResponse> getCallback(final HttpRequestBase request) {
+    return new FutureCallback<HttpResponse>() {
+
+      public void completed(final HttpResponse response) {
+        request.releaseConnection();
+        LOG.info("Note {} completed with {} status", request.getMethod(),
+            response.getStatusLine());
+      }
+
+      public void failed(final Exception ex) {
+        request.releaseConnection();
+        LOG.error("Note {} failed with {} message", request.getMethod(),
+            ex.getMessage());
+      }
+
+      public void cancelled() {
+        request.releaseConnection();
+        LOG.info("Note {} was canceled", request.getMethod());
+      }
+    };
+  }
+  
+  public void stop() {
+    try {
+      client.close();
+    } catch (Exception e) {
+      LOG.error("Failed to close proxy client ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
index f2ae7b9..437386c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -27,6 +29,12 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.StringEntity;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Request;
@@ -52,11 +60,10 @@ public class ZeppelinhubRestApiHandler {
   private static final String USER_SESSION_HEADER = "X-User-Session";
   private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
   private static boolean PROXY_ON = false;
-  private static String PROXY_HOST;
-  private static int PROXY_PORT;
-
+  //TODO(xxx): possibly switch to jetty-client > 9.3.12 when adopt jvm 1.8
+  private static HttpProxyClient proxyClient;
   private final HttpClient client;
-  private final String zepelinhubUrl;
+  private String zepelinhubUrl;
 
   public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl) {
     return new ZeppelinhubRestApiHandler(zeppelinhubUrl);
@@ -65,8 +72,7 @@ public class ZeppelinhubRestApiHandler {
   private ZeppelinhubRestApiHandler(String zeppelinhubUrl) {
     this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
 
-    //TODO(khalid):to make proxy conf consistent with Zeppelin confs
-    //readProxyConf();
+    readProxyConf();
     client = getAsyncClient();
 
     try {
@@ -74,48 +80,41 @@ public class ZeppelinhubRestApiHandler {
     } catch (Exception e) {
       LOG.error("Cannot initialize ZeppelinHub REST async client", e);
     }
-
   }
-
+  
   private void readProxyConf() {
-    //try reading http_proxy
-    String proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
-        System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
+    //try reading https_proxy
+    String proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
+        System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
     if (StringUtils.isBlank(proxyHostString)) {
-      //try https_proxy if no http_proxy
-      proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
-          System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
+      //try http_proxy if no https_proxy
+      proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
+          System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
     }
 
-    if (StringUtils.isBlank(proxyHostString)) {
-      PROXY_ON = false;
-    } else {
-      // host format - http://domain:port/
-      String[] parts = proxyHostString.replaceAll("/", "").split(":");
-      if (parts.length != 3) {
-        LOG.warn("Proxy host format is incorrect {}, e.g. http://domain:port/", proxyHostString);
-        PROXY_ON = false;
-        return;
+    if (!StringUtils.isBlank(proxyHostString)) {
+      URI uri = null;
+      try {
+        uri = new URI(proxyHostString);
+      } catch (URISyntaxException e) {
+        LOG.warn("Proxy uri doesn't follow correct syntax", e);
+      }
+      if (uri != null) {
+        PROXY_ON = true;
+        proxyClient = HttpProxyClient.newInstance(uri);
       }
-      PROXY_HOST = parts[1];
-      PROXY_PORT = Integer.parseInt(parts[2]);
-      LOG.info("Proxy protocol: {}, domain: {}, port: {}", parts[0], parts[1], parts[2]);
-      PROXY_ON = true;
     }
   }
 
   private HttpClient getAsyncClient() {
     SslContextFactory sslContextFactory = new SslContextFactory();
     HttpClient httpClient = new HttpClient(sslContextFactory);
-
     // Configure HttpClient
     httpClient.setFollowRedirects(false);
     httpClient.setMaxConnectionsPerDestination(100);
+
     // Config considerations
-    //TODO(khalid): consider using proxy
-    //TODO(khalid): consider whether require to follow redirects
     //TODO(khalid): consider multi-threaded connection manager case
-
     return httpClient;
   }
 
@@ -159,7 +158,11 @@ public class ZeppelinhubRestApiHandler {
       return StringUtils.EMPTY;
     }
     String url = zepelinhubUrl + argument;
-    return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
+    if (PROXY_ON) {
+      return sendToZeppelinHubViaProxy(new HttpGet(url), StringUtils.EMPTY, token, true);
+    } else {
+      return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
+    }
   }
   
   public String putWithResponseBody(String token, String url, String json) throws IOException
{
@@ -167,7 +170,11 @@ public class ZeppelinhubRestApiHandler {
       LOG.error("Empty note, cannot send it to zeppelinHub");
       throw new IOException("Cannot send emtpy note to zeppelinHub");
     }
-    return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
+    if (PROXY_ON) {
+      return sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl + url), json, token, true);
+    } else {
+      return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
+    }
   }
   
   public void put(String token, String jsonNote) throws IOException {
@@ -175,7 +182,11 @@ public class ZeppelinhubRestApiHandler {
       LOG.error("Cannot save empty note/string to ZeppelinHub");
       return;
     }
-    sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
+    if (PROXY_ON) {
+      sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl), jsonNote, token, false);
+    } else {
+      sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
+    }
   }
 
   public void del(String token, String argument) throws IOException {
@@ -183,7 +194,37 @@ public class ZeppelinhubRestApiHandler {
       LOG.error("Cannot delete empty note from ZeppelinHub");
       return;
     }
-    sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token,
false);
+    if (PROXY_ON) {
+      sendToZeppelinHubViaProxy(new HttpDelete(zepelinhubUrl + argument), StringUtils.EMPTY,
token,
+          false);
+    } else {
+      sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token,
+          false);
+    }
+  }
+  
+  private String sendToZeppelinHubViaProxy(HttpRequestBase request, 
+                                           String json, 
+                                           String token,
+                                           boolean withResponse) throws IOException {
+    request.setHeader(ZEPPELIN_TOKEN_HEADER, token);
+    if (request.getMethod().equals(HttpPost.METHOD_NAME)) {
+      HttpPost post = (HttpPost) request;
+      StringEntity content = new StringEntity(json, "application/json;charset=UTF-8");
+      post.setEntity(content);
+    }
+    if (request.getMethod().equals(HttpPut.METHOD_NAME)) {
+      HttpPut put = (HttpPut) request;
+      StringEntity content = new StringEntity(json, "application/json;charset=UTF-8");
+      put.setEntity(content);
+    }
+    String body = StringUtils.EMPTY;
+    if (proxyClient != null) {
+      body = proxyClient.sendToZeppelinHub(request, withResponse);
+    } else {
+      LOG.warn("Proxy client request was submitted while not correctly initialized");
+    }
+    return body; 
   }
   
   private String sendToZeppelinHub(HttpMethod method,
@@ -243,6 +284,9 @@ public class ZeppelinhubRestApiHandler {
   public void close() {
     try {
       client.stop();
+      if (proxyClient != null) {
+        proxyClient.stop();
+      }
     } catch (Exception e) {
       LOG.info("Couldn't stop ZeppelinHub client properly", e);
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
index 9847e1c..b072251 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
@@ -137,9 +137,22 @@ public class ZeppelinClient {
     new Timer().schedule(new java.util.TimerTask() {
       @Override
       public void run() {
-        watcherSession = openWatcherSession();
+        int time = 0;
+        while (time < 5 * MIN) {
+          watcherSession = openWatcherSession();
+          if (watcherSession == null) {
+            try {
+              Thread.sleep(5000);
+              time += 5;
+            } catch (InterruptedException e) {
+              //continue
+            }
+          } else {
+            break;
+          }
+        }
       }
-    }, 10000);
+    }, 5000);
   }
 
   public void stop() {


Mime
View raw message