Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 957F6200B8E for ; Mon, 26 Sep 2016 10:51:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 94173160AC8; Mon, 26 Sep 2016 08:51:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 898D3160AB8 for ; Mon, 26 Sep 2016 10:51:53 +0200 (CEST) Received: (qmail 90473 invoked by uid 500); 26 Sep 2016 08:51:52 -0000 Mailing-List: contact commits-help@zeppelin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zeppelin.apache.org Delivered-To: mailing list commits@zeppelin.apache.org Received: (qmail 90464 invoked by uid 99); 26 Sep 2016 08:51:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Sep 2016 08:51:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 900E9DFAF2; Mon, 26 Sep 2016 08:51:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: minalee@apache.org To: commits@zeppelin.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: zeppelin git commit: Zeppelin 1307 - Implement notebook revision in Zeppelinhub repo Date: Mon, 26 Sep 2016 08:51:52 +0000 (UTC) archived-at: Mon, 26 Sep 2016 08:51:54 -0000 Repository: zeppelin Updated Branches: refs/heads/branch-0.6 a22a0355d -> a102e00fc Zeppelin 1307 - Implement notebook revision in Zeppelinhub repo Implement versioning in ZeppelinHub notebook storage. Improvement * [x] - Implement Versioning API * [ZEPPELIN-1307](https://issues.apache.org/jira/browse/ZEPPELIN-1307) Edit `zeppelin-env.sh` and add `org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo` in `ZEPPELIN_NOTEBOOK_STORAGE`. * Does the licenses files need update? NO * Is there breaking changes for older versions? NO * Does this needs documentation? NO Author: Anthony Corbacho Closes #1338 from anthonycorbacho/ZEPPELIN-1307 and squashes the following commits: dd57e7f [Anthony Corbacho] Fix NPE aef5cf3 [Anthony Corbacho] cleanup code 6cd9251 [Anthony Corbacho] revert change to try ressource stmnt 3b919a9 [Anthony Corbacho] Rework log trace 74a0cdb [Anthony Corbacho] change asyncPutWithResponseBody to accpet url instead of noteId 2395a6e [Anthony Corbacho] Light refactor of ZeppelinHubRestapiHandler and extract api call to a single method 5d4b54b [Anthony Corbacho] Implement checkpoint method 3942a78 [Anthony Corbacho] Implement get revision 9bd0946 [Anthony Corbacho] Close InputStream in asyncGet (cherry picked from commit 7f733ffb2ef5bfe30418028f696d267046dba833) Signed-off-by: Mina Lee Conflicts: zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/a102e00f Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/a102e00f Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/a102e00f Branch: refs/heads/branch-0.6 Commit: a102e00fcac278f5aa590b1d5f1db68e458e9240 Parents: a22a035 Author: Anthony Corbacho Authored: Tue Aug 23 11:40:09 2016 +0900 Committer: Mina Lee Committed: Mon Sep 26 17:51:43 2016 +0900 ---------------------------------------------------------------------- .../repo/zeppelinhub/ZeppelinHubRepo.java | 39 +++++- .../rest/ZeppelinhubRestApiHandler.java | 118 ++++++++----------- 2 files changed, 80 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a102e00f/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java index 45bf0a1..cf94049 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java @@ -33,6 +33,8 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -191,20 +193,45 @@ public class ZeppelinHubRepo implements NotebookRepo { @Override public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject) throws IOException { - // Auto-generated method stub - return null; + if (StringUtils.isBlank(noteId)) { + return null; + } + String endpoint = Joiner.on("/").join(noteId, "checkpoint"); + String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg)); + String response = restApiClient.asyncPutWithResponseBody(endpoint, content); + + return GSON.fromJson(response, Revision.class); } @Override public Note get(String noteId, Revision rev, AuthenticationInfo subject) throws IOException { - // Auto-generated method stub - return null; + if (StringUtils.isBlank(noteId) || revId == null) { + return EMPTY_NOTE; + } + String endpoint = Joiner.on("/").join(noteId, "checkpoint", rev.revId); + String response = restApiClient.asyncGet(endpoint); + Note note = GSON.fromJson(response, Note.class); + if (note == null) { + return EMPTY_NOTE; + } + LOG.info("ZeppelinHub REST API get note {} revision {}", noteId, rev.revId); + return note; } @Override public List revisionHistory(String noteId, AuthenticationInfo subject) { - // Auto-generated method stub - return null; + if (StringUtils.isBlank(noteId)) { + return Collections.emptyList(); + } + String endpoint = Joiner.on("/").join(noteId, "checkpoint"); + List history = Collections.emptyList(); + try { + String response = restApiClient.asyncGet(endpoint); + history = GSON.fromJson(response, new TypeToken>(){}.getType()); + } catch (IOException e) { + LOG.error("Cannot get note history", e); + } + return history; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a102e00f/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java index 8f9b2e5..82159fc 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java @@ -25,9 +25,8 @@ import java.util.concurrent.TimeoutException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.client.util.StringContentProvider; import org.eclipse.jetty.http.HttpMethod; @@ -115,89 +114,66 @@ public class ZeppelinhubRestApiHandler { } public String asyncGet(String argument) throws IOException { - String note = StringUtils.EMPTY; + return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument); + } + + public String asyncPutWithResponseBody(String url, String json) throws IOException { + if (StringUtils.isBlank(url) || StringUtils.isBlank(json)) { + LOG.error("Empty note, cannot send it to zeppelinHub"); + throw new IOException("Cannot send emtpy note to zeppelinHub"); + } + return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json); + } + + public void asyncPut(String jsonNote) throws IOException { + if (StringUtils.isBlank(jsonNote)) { + LOG.error("Cannot save empty note/string to ZeppelinHub"); + return; + } + sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote); + } + public void asyncDel(String argument) throws IOException { + if (StringUtils.isBlank(argument)) { + LOG.error("Cannot delete empty note from ZeppelinHub"); + return; + } + sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument); + } + + private String sendToZeppelinHub(HttpMethod method, String url) throws IOException { + return sendToZeppelinHub(method, url, StringUtils.EMPTY); + } + + private String sendToZeppelinHub(HttpMethod method, String url, String json) throws IOException { InputStreamResponseListener listener = new InputStreamResponseListener(); - client.newRequest(zepelinhubUrl + argument) - .header(ZEPPELIN_TOKEN_HEADER, token) - .send(listener); - - // Wait for the response headers to arrive Response response; + String data; + + Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token); + if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST)) && + !StringUtils.isBlank(json)) { + request.content(new StringContentProvider(json, "UTF-8"), "application/json;charset=UTF-8"); + } + request.send(listener); + try { response = listener.get(30, TimeUnit.SECONDS); } catch (InterruptedException | TimeoutException | ExecutionException e) { - LOG.error("Cannot perform Get request to ZeppelinHub", e); - throw new IOException("Cannot load note from ZeppelinHub", e); + LOG.error("Cannot perform {} request to ZeppelinHub", method, e); + throw new IOException("Cannot perform " + method + " request to ZeppelinHub", e); } int code = response.getStatus(); if (code == 200) { try (InputStream responseContent = listener.getInputStream()) { - note = IOUtils.toString(responseContent, "UTF-8"); + data = IOUtils.toString(responseContent, "UTF-8"); } } else { - LOG.error("ZeppelinHub Get {} returned with status {} ", zepelinhubUrl + argument, code); - throw new IOException("Cannot load note from ZeppelinHub"); - } - return note; - } - - public void asyncPut(String jsonNote) throws IOException { - if (StringUtils.isBlank(jsonNote)) { - LOG.error("Cannot save empty note/string to ZeppelinHub"); - return; - } - - client.newRequest(zepelinhubUrl).method(HttpMethod.PUT) - .header(ZEPPELIN_TOKEN_HEADER, token) - .content(new StringContentProvider(jsonNote, "UTF-8"), "application/json;charset=UTF-8") - .send(new BufferingResponseListener() { - - @Override - public void onComplete(Result res) { - if (!res.isFailed() && res.getResponse().getStatus() == 200) { - LOG.info("Successfully saved note to ZeppelinHub with {}", - res.getResponse().getStatus()); - } else { - LOG.warn("Failed to save note to ZeppelinHub with HttpStatus {}", - res.getResponse().getStatus()); - } - } - - @Override - public void onFailure(Response response, Throwable failure) { - LOG.error("Failed to save note to ZeppelinHub: {}", response.getReason(), failure); - } - }); - } - - public void asyncDel(String argument) { - if (StringUtils.isBlank(argument)) { - LOG.error("Cannot delete empty note from ZeppelinHub"); - return; + LOG.error("ZeppelinHub {} {} returned with status {} ", method, url, code); + throw new IOException("Cannot perform " + method + " request to ZeppelinHub"); } - client.newRequest(zepelinhubUrl + argument) - .method(HttpMethod.DELETE) - .header(ZEPPELIN_TOKEN_HEADER, token) - .send(new BufferingResponseListener() { - - @Override - public void onComplete(Result res) { - if (!res.isFailed() && res.getResponse().getStatus() == 200) { - LOG.info("Successfully removed note from ZeppelinHub with {}", - res.getResponse().getStatus()); - } else { - LOG.warn("Failed to remove note from ZeppelinHub with HttpStatus {}", - res.getResponse().getStatus()); - } - } - - @Override - public void onFailure(Response response, Throwable failure) { - LOG.error("Failed to remove note from ZeppelinHub: {}", response.getReason(), failure); - } - }); + return data; } public void close() {