zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anthonycorba...@apache.org
Subject zeppelin git commit: [ZEPPELIN-1697] Fix multiuser ws connection for secondary ZeppelinHubRepo storage
Date Tue, 21 Mar 2017 10:38:20 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.7 ecd40c0f8 -> c7ad6108e


[ZEPPELIN-1697] Fix multiuser ws connection for secondary ZeppelinHubRepo storage

### What is this PR for?
This is to fix problem with ws connection that was introduced in 0.7.0 with `ZeppelinHubRepo` storage layer.

### What type of PR is it?
Bug Fix

### Todos
* [x] - fix multi-user ws connection issue
* [x] - handle back connection
* [x] - handle switch tokens
* [x] - handle single token in conf

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1697

### How should this be tested?
scenario 1.
1. setup storage as in [here](https://zeppelin.apache.org/docs/0.7.0/storage/storage.html#storage-in-zeppelinhub), note that can use single storage as well (useful when switching instances)
2. setup authentication through ZeppelinHub realm as in [here](https://zeppelin.apache.org/docs/0.7.0/security/shiroauthentication.html#zeppelinhub).
3. login and run paragraph

scenario 2.
this scenario has basic Zeppelin authentication
1. setup storage as in [here](https://zeppelin.apache.org/docs/0.7.0/storage/storage.html#storage-in-zeppelinhub), note that can use single storage as well (useful when switching instances)
2. use non-authenticated mode
3. start zeppelin and run paragrpaph

other cases would be nice to try and report

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Khalid Huseynov <khalidhnv@gmail.com>

Closes #2161 from khalidhuseynov/fix/ZEPPELIN-1697 and squashes the following commits:

0b76a9b [Khalid Huseynov] minor: add two paragraph update ops
37e4a9f [Khalid Huseynov] add watcher principal
4616a33 [Khalid Huseynov] update ops
4e36ce6 [Khalid Huseynov] token container instance -> getInstance
b465b6e [Khalid Huseynov] clean logs
7960400 [Khalid Huseynov] fix live msg
c24b8a1 [Khalid Huseynov] set conf token for anonymous
941794f [Khalid Huseynov] realm token precedence over conf token
91cb537 [Khalid Huseynov] handle empty tokens in rest handler
d14f04f [Khalid Huseynov] switch tokens routine
5755a5c [Khalid Huseynov] fix ws back connection
7e1ac5d [Khalid Huseynov] fix empty token case
39a9a18 [Khalid Huseynov] multi ws one way connection

(cherry picked from commit a105a21a93145e0dfe0c91466d6a3700b83bd9c6)
Signed-off-by: Anthony Corbacho <corbacho.anthony@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c7ad6108
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c7ad6108
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c7ad6108

Branch: refs/heads/branch-0.7
Commit: c7ad6108e57220ed9ba758ea033f5ed731281a65
Parents: ecd40c0
Author: Khalid Huseynov <khalidhnv@gmail.com>
Authored: Tue Mar 21 15:34:36 2017 +0900
Committer: Anthony Corbacho <corbacho.anthony@gmail.com>
Committed: Tue Mar 21 19:38:14 2017 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/realm/ZeppelinHubRealm.java |  29 ++--
 .../apache/zeppelin/ticket/TicketContainer.java |  82 ----------
 .../notebook/repo/NotebookRepoSync.java         |   1 -
 .../repo/zeppelinhub/ZeppelinHubRepo.java       |  95 ++++--------
 .../zeppelinhub/model/UserSessionContainer.java |   2 +-
 .../zeppelinhub/model/UserTokenContainer.java   | 149 +++++++++++++++++++
 .../rest/ZeppelinhubRestApiHandler.java         |   9 +-
 .../repo/zeppelinhub/websocket/Client.java      |   4 +-
 .../zeppelinhub/websocket/ZeppelinClient.java   |  87 +++++++++--
 .../websocket/ZeppelinhubClient.java            |  76 +++++++---
 .../websocket/listener/WatcherWebsocket.java    |   7 +-
 .../scheduler/ZeppelinHubHeartbeat.java         |   7 +-
 .../websocket/utils/ZeppelinhubUtils.java       |  30 ++++
 .../apache/zeppelin/ticket/TicketContainer.java |  82 ++++++++++
 .../websocket/ZeppelinClientTest.java           |  10 +-
 15 files changed, 460 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java b/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
index c37a0a8..d84207d 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
@@ -38,6 +38,7 @@ import org.apache.shiro.authz.AuthorizationInfo;
 import org.apache.shiro.realm.AuthorizingRealm;
 import org.apache.shiro.subject.PrincipalCollection;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -155,15 +156,8 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
       throw new AuthenticationException("Cannot login to ZeppelinHub");
     }
 
-    // Add ZeppelinHub user_session token this singleton map, this will help ZeppelinHubRepo
-    // to get specific information about the current user.
-    UserSessionContainer.instance.setSession(account.login, userSession);
-
-    /* TODO(khalid): add proper roles and add listener */
-    HashSet<String> userAndRoles = new HashSet<String>();
-    userAndRoles.add(account.login);
-    ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
-        new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);
+    onLoginSuccess(account.login, userSession);
+    
     return account;
   }
 
@@ -213,4 +207,21 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
     public String email;
     public String name;
   }
+  
+  public void onLoginSuccess(String username, String session) {
+    UserSessionContainer.instance.setSession(username, session);
+
+    /* TODO(xxx): add proper roles */
+    HashSet<String> userAndRoles = new HashSet<String>();
+    userAndRoles.add(username);
+    ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
+        new org.apache.zeppelin.user.AuthenticationInfo(username), userAndRoles);
+
+    ZeppelinhubUtils.userLoginRoutine(username);
+  }
+  
+  @Override
+  public void onLogout(PrincipalCollection principals) {
+    ZeppelinhubUtils.userLogoutRoutine((String) principals.getPrimaryPrincipal());
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-server/src/main/java/org/apache/zeppelin/ticket/TicketContainer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/ticket/TicketContainer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/ticket/TicketContainer.java
deleted file mode 100644
index 513bb4a..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/ticket/TicketContainer.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ticket;
-
-import java.util.Calendar;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Very simple ticket container
- * No cleanup is done, since the same user accross different devices share the same ticket
- * The Map size is at most the number of different user names having access to a Zeppelin instance
- */
-
-
-public class TicketContainer {
-  private static class Entry {
-    public final String ticket;
-    // lastAccessTime still unused
-    public final long lastAccessTime;
-
-    Entry(String ticket) {
-      this.ticket = ticket;
-      this.lastAccessTime = Calendar.getInstance().getTimeInMillis();
-    }
-  }
-
-  private Map<String, Entry> sessions = new ConcurrentHashMap<>();
-
-  public static final TicketContainer instance = new TicketContainer();
-
-  /**
-   * For test use
-   * @param principal
-   * @param ticket
-   * @return true if ticket assigned to principal.
-   */
-  public boolean isValid(String principal, String ticket) {
-    if ("anonymous".equals(principal) && "anonymous".equals(ticket))
-      return true;
-    Entry entry = sessions.get(principal);
-    return entry != null && entry.ticket.equals(ticket);
-  }
-
-  /**
-   * get or create ticket for Websocket authentication assigned to authenticated shiro user
-   * For unathenticated user (anonymous), always return ticket value "anonymous"
-   * @param principal
-   * @return
-   */
-  public synchronized String getTicket(String principal) {
-    Entry entry = sessions.get(principal);
-    String ticket;
-    if (entry == null) {
-      if (principal.equals("anonymous"))
-        ticket = "anonymous";
-      else
-        ticket = UUID.randomUUID().toString();
-    } else {
-      ticket = entry.ticket;
-    }
-    entry = new Entry(ticket);
-    sessions.put(principal, entry);
-    return ticket;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java
index 8553349..6bbd5bc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java
@@ -75,7 +75,6 @@ public class NotebookRepoSync implements NotebookRepo {
     }
 
     for (int i = 0; i < Math.min(storageClassNames.length, getMaxRepoNum()); i++) {
-      @SuppressWarnings("static-access")
       Class<?> notebookStorageClass;
       try {
         notebookStorageClass = getClass().forName(storageClassNames[i].trim());

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
index 2524e7a..2f33f6f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -22,8 +22,6 @@ import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -32,8 +30,11 @@ import org.apache.zeppelin.notebook.NoteInfo;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserTokenContainer;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,27 +56,27 @@ public class ZeppelinHubRepo implements NotebookRepo {
   public static final String TOKEN_HEADER = "X-Zeppelin-Token";
   private static final Gson GSON = new Gson();
   private static final Note EMPTY_NOTE = new Note();
-  //private final Client websocketClient;
+  private final Client websocketClient;
+  private final UserTokenContainer tokenManager;
 
   private String token;
   private ZeppelinhubRestApiHandler restApiClient;
   
   private final ZeppelinConfiguration conf;
-
-  // In order to avoid too many call to ZeppelinHub backend, we save a map of user -> session.
-  private ConcurrentMap<String, String> usersToken = new ConcurrentHashMap<String, String>();
   
   public ZeppelinHubRepo(ZeppelinConfiguration conf) {
     this.conf = conf;
     String zeppelinHubUrl = getZeppelinHubUrl(conf);
     LOG.info("Initializing ZeppelinHub integration module");
+
     token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
-    restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
+    restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl);
+    //TODO(khalid): check which realm for authentication, pass to token manager
+    tokenManager = UserTokenContainer.init(restApiClient, token);
 
-    // TODO(xxx): refactor this in the next itaration
-    //websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
-    //    getZeppelinhubWebsocketUri(conf), token, conf);
-    //websocketClient.start();
+    websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
+        getZeppelinhubWebsocketUri(conf), token, conf);
+    websocketClient.start();
   }
 
   private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
@@ -155,58 +156,6 @@ public class ZeppelinHubRepo implements NotebookRepo {
     }
     return zeppelinhubUrl;
   }
-  
-  /**
-   * Get list of user instances from Zeppelinhub.
-   * This will avoid and remove the needs of setting up token in zeppelin-env.sh.
-   */
-  private List<Instance> getUserInstances(String ticket) throws IOException {
-    if (StringUtils.isBlank(ticket)) {
-      return Collections.emptyList();
-    }
-    return restApiClient.getInstances(ticket);
-  }
-
-  /**
-   * Get user default instance.
-   * From now, it will be from the first instance from the list,
-   * But later we can think about marking a default one and return it instead :)
-   */
-  private String getDefaultZeppelinInstanceToken(String ticket) throws IOException {
-    List<Instance> instances = getUserInstances(ticket);
-    if (instances.isEmpty()) {
-      return StringUtils.EMPTY;
-    }
-
-    String token = instances.get(0).token;
-    LOG.debug("The following instance has been assigned {} with token {}", instances.get(0).name,
-        token);
-    return token;
-  }
-
-  /**
-   * For a given user logged in is zeppelin (via zeppelinhub notebook repo), get default token.
-   *
-   */
-  private String getUserToken(String principal) {
-    // Case of user use token instead of authentication.
-    if (!StringUtils.isBlank(token)) {
-      return token;
-    }
-
-    String token = usersToken.get(principal);
-    if (StringUtils.isBlank(token)) {
-      String ticket = UserSessionContainer.instance.getSession(principal);
-      try {
-        token = getDefaultZeppelinInstanceToken(ticket);
-        usersToken.putIfAbsent(principal, token);
-      } catch (IOException e) {
-        LOG.error("Cannot get user token", e);
-        token = StringUtils.EMPTY;
-      }
-    }
-    return token;
-  }
 
   private boolean isSubjectValid(AuthenticationInfo subject) {
     if (subject == null) {
@@ -292,7 +241,6 @@ public class ZeppelinHubRepo implements NotebookRepo {
       return EMPTY_NOTE;
     }
     String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId);
-    
     String token = getUserToken(subject.getUser());
     String response = restApiClient.get(token, endpoint);
 
@@ -320,6 +268,10 @@ public class ZeppelinHubRepo implements NotebookRepo {
     }
     return history;
   }
+  
+  private String getUserToken(String user) {
+    return tokenManager.getUserToken(user);
+  }
 
   @Override
   public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject) {
@@ -335,7 +287,7 @@ public class ZeppelinHubRepo implements NotebookRepo {
     List<Map<String, String>> values = Lists.newLinkedList();
 
     try {
-      instances = getUserInstances(zeppelinHubUserSession);
+      instances = tokenManager.getUserInstances(zeppelinHubUserSession);
     } catch (IOException e) {
       LOG.warn("Couldnt find instances for the session {}, returning empty collection",
           zeppelinHubUserSession);
@@ -368,19 +320,24 @@ public class ZeppelinHubRepo implements NotebookRepo {
     LOG.info("User {} will switch instance", user);
     String ticket = UserSessionContainer.instance.getSession(user);
     List<Instance> instances;
+    String currentToken = StringUtils.EMPTY, targetToken = StringUtils.EMPTY;
     try {
-      instances = getUserInstances(ticket);
+      instances = tokenManager.getUserInstances(ticket);
       if (instances.isEmpty()) {
         return;
       }
-
+      currentToken = tokenManager.getExistingUserToken(user);
       for (Instance instance : instances) {
         if (instance.id == instanceId) {
-          LOG.info("User {} switched to instance {}", user, instances.get(0).name);
-          usersToken.put(user, instance.token);
+          LOG.info("User {} switched to instance {}", user, instance.name);
+          tokenManager.setUserToken(user, instance.token);
+          targetToken = instance.token;
           break;
         }
       }
+      if (!StringUtils.isBlank(currentToken) && !StringUtils.isBlank(targetToken)) {
+        ZeppelinhubUtils.userSwitchTokenRoutine(user, currentToken, targetToken);
+      }
     } catch (IOException e) {
       LOG.error("Cannot switch instance for user {}", user, e);
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
index c741e77..7f035b1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
@@ -28,7 +28,7 @@ import org.apache.commons.lang.StringUtils;
 public class UserSessionContainer {
   private static class Entity {
     public final String userSession;
-
+    
     Entity(String userSession) {
       this.userSession = userSession;
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java
new file mode 100644
index 0000000..b594f89
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserTokenContainer.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.model;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * User token manager class.
+ *
+ */
+
+public class UserTokenContainer {
+  private static final Logger LOG = LoggerFactory.getLogger(UserTokenContainer.class);
+  private static UserTokenContainer instance = null;
+  private ConcurrentMap<String, String> userTokens = new ConcurrentHashMap<String, String>();
+  private final ZeppelinhubRestApiHandler restApiClient;
+  private String defaultToken;
+
+  public static UserTokenContainer init(ZeppelinhubRestApiHandler restClient, 
+      String defaultToken) {
+    if (instance == null) {
+      instance = new UserTokenContainer(restClient, defaultToken);
+    }
+    return instance;
+  }
+
+  private UserTokenContainer(ZeppelinhubRestApiHandler restClient, String defaultToken) {
+    restApiClient = restClient;
+    this.defaultToken = defaultToken;
+  }
+
+  public static UserTokenContainer getInstance() {
+    return instance;
+  }
+  
+  public void setUserToken(String username, String token) {
+    if (StringUtils.isBlank(username) || StringUtils.isBlank(token)) {
+      LOG.warn("Can't set empty user token");
+      return;
+    }
+    userTokens.put(username, token);
+  }
+  
+  public String getUserToken(String principal) {
+    if (StringUtils.isBlank(principal) || "anonymous".equals(principal)) {
+      if (StringUtils.isBlank(defaultToken)) {
+        return StringUtils.EMPTY;
+      } else {
+        userTokens.putIfAbsent(principal, defaultToken);
+        return defaultToken;
+      }
+    }
+    String token = userTokens.get(principal);
+    if (StringUtils.isBlank(token)) {
+      String ticket = UserSessionContainer.instance.getSession(principal);
+      try {
+        token = getDefaultZeppelinInstanceToken(ticket);
+        if (StringUtils.isBlank(token)) {
+          if (!StringUtils.isBlank(defaultToken)) {
+            token = defaultToken;
+          }
+        } else {
+          userTokens.putIfAbsent(principal, token);
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot get user token", e);
+        token = StringUtils.EMPTY;
+      }
+    }
+    return token;
+  }
+  
+  public String getExistingUserToken(String principal) {
+    if (StringUtils.isBlank(principal) || "anonymous".equals(principal)) {
+      return StringUtils.EMPTY;
+    }
+    String token = userTokens.get(principal);
+    if (token == null) {
+      return StringUtils.EMPTY;
+    }
+    return token;
+  }
+  
+  public String removeUserToken(String username) {
+    return userTokens.remove(username);
+  }
+  
+  /**
+   * Get user default instance.
+   * From now, it will be from the first instance from the list,
+   * But later we can think about marking a default one and return it instead :)
+   */
+  public String getDefaultZeppelinInstanceToken(String ticket) throws IOException {
+    List<Instance> instances = getUserInstances(ticket);
+    if (instances.isEmpty()) {
+      return StringUtils.EMPTY;
+    }
+
+    String token = instances.get(0).token;
+    LOG.debug("The following instance has been assigned {} with token {}", instances.get(0).name,
+        token);
+    return token;
+  }
+  
+  /**
+   * Get list of user instances from Zeppelinhub.
+   * This will avoid and remove the needs of setting up token in zeppelin-env.sh.
+   */
+  public List<Instance> getUserInstances(String ticket) throws IOException {
+    if (StringUtils.isBlank(ticket)) {
+      return Collections.emptyList();
+    }
+    return restApiClient.getInstances(ticket);
+  }
+  
+  public List<String> getAllTokens() {
+    return new ArrayList<String>(userTokens.values());
+  }
+  
+  public Map<String, String> getAllUserTokens() {
+    return new HashMap<String, String>(userTokens);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
index a913d85..f2ae7b9 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -58,11 +58,11 @@ public class ZeppelinhubRestApiHandler {
   private final HttpClient client;
   private final String zepelinhubUrl;
 
-  public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl, String token) {
-    return new ZeppelinhubRestApiHandler(zeppelinhubUrl, token);
+  public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl) {
+    return new ZeppelinhubRestApiHandler(zeppelinhubUrl);
   }
 
-  private ZeppelinhubRestApiHandler(String zeppelinhubUrl, String token) {
+  private ZeppelinhubRestApiHandler(String zeppelinhubUrl) {
     this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
 
     //TODO(khalid):to make proxy conf consistent with Zeppelin confs
@@ -155,6 +155,9 @@ public class ZeppelinhubRestApiHandler {
   }
 
   public String get(String token, String argument) throws IOException {
+    if (StringUtils.isBlank(token)) {
+      return StringUtils.EMPTY;
+    }
     String url = zepelinhubUrl + argument;
     return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
index cd4d4a6..87a1a8f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
@@ -72,8 +72,8 @@ public class Client {
     }
   }
 
-  public void relayToZeppelinHub(String message) {
-    zeppelinhubClient.send(message);
+  public void relayToZeppelinHub(String message, String token) {
+    zeppelinhubClient.send(message, token);
   }
 
   public void relayToZeppelin(Message message, String noteId) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
index e05a746..fe70f71 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
@@ -18,8 +18,11 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -27,6 +30,8 @@ import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserTokenContainer;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.WatcherWebsocket;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
@@ -53,7 +58,6 @@ import com.google.gson.JsonSyntaxException;
 public class ZeppelinClient {
   private static final Logger LOG = LoggerFactory.getLogger(ZeppelinClient.class);
   private final URI zeppelinWebsocketUrl;
-  private final String zeppelinhubToken;
   private final WebSocketClient wsClient;
   private static Gson gson;
   // Keep track of current open connection per notebook.
@@ -65,6 +69,20 @@ public class ZeppelinClient {
   private Authentication authModule;
   private static final int MIN = 60;
 
+  private static final Set<String> actionable = new  HashSet<String>(Arrays.asList(
+      // running events
+      "ANGULAR_OBJECT_UPDATE",
+      "PROGRESS",
+      "NOTE",
+      "PARAGRAPH",
+      "PARAGRAPH_UPDATE_OUTPUT",
+      "PARAGRAPH_APPEND_OUTPUT",
+      "PARAGRAPH_CLEAR_OUTPUT",
+      "PARAGRAPH_REMOVE",
+      // run or stop events
+      "RUN_PARAGRAPH",
+      "CANCEL_PARAGRAPH"));
+
   public static ZeppelinClient initialize(String zeppelinUrl, String token, 
       ZeppelinConfiguration conf) {
     if (instance == null) {
@@ -79,7 +97,6 @@ public class ZeppelinClient {
 
   private ZeppelinClient(String zeppelinUrl, String token, ZeppelinConfiguration conf) {
     zeppelinWebsocketUrl = URI.create(zeppelinUrl);
-    zeppelinhubToken = token;
     wsClient = createNewWebsocketClient();
     gson = new Gson();
     notesConnection = new ConcurrentHashMap<>();
@@ -121,7 +138,7 @@ public class ZeppelinClient {
       public void run() {
         watcherSession = openWatcherSession();
       }
-    }, 5000);
+    }, 10000);
   }
 
   public void stop() {
@@ -185,7 +202,7 @@ public class ZeppelinClient {
   }
 
   public void send(Message msg, String noteId) {
-    Session noteSession = getZeppelinConnection(noteId);
+    Session noteSession = getZeppelinConnection(noteId, msg.principal, msg.ticket);
     if (!isSessionOpen(noteSession)) {
       LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
       return;
@@ -193,12 +210,12 @@ public class ZeppelinClient {
     noteSession.getRemote().sendStringByFuture(serialize(msg));
   }
   
-  public Session getZeppelinConnection(String noteId) {
+  public Session getZeppelinConnection(String noteId, String principal, String ticket) {
     if (StringUtils.isBlank(noteId)) {
       LOG.warn("Cannot get Websocket session with blanck noteId");
       return null;
     }
-    return getNoteSession(noteId);
+    return getNoteSession(noteId, principal, ticket);
   }
   
 /*
@@ -211,18 +228,18 @@ public class ZeppelinClient {
   }
   */
 
-  private Session getNoteSession(String noteId) {
+  private Session getNoteSession(String noteId, String principal, String ticket) {
     LOG.info("Getting Note websocket connection for note {}", noteId);
     Session session = notesConnection.get(noteId);
     if (!isSessionOpen(session)) {
       LOG.info("No open connection for note {}, opening one", noteId);
       notesConnection.remove(noteId);
-      session = openNoteSession(noteId);
+      session = openNoteSession(noteId, principal, ticket);
     }
     return session;
   }
   
-  private Session openNoteSession(String noteId) {
+  private Session openNoteSession(String noteId, String principal, String ticket) {
     ClientUpgradeRequest request = new ClientUpgradeRequest();
     ZeppelinWebsocket socket = new ZeppelinWebsocket(noteId);
     Future<Session> future = null;
@@ -239,7 +256,7 @@ public class ZeppelinClient {
       session.close();
       session = notesConnection.get(noteId);
     } else {
-      String getNote = serialize(zeppelinGetNoteMsg(noteId));
+      String getNote = serialize(zeppelinGetNoteMsg(noteId, principal, ticket));
       session.getRemote().sendStringByFuture(getNote);
       notesConnection.put(noteId, session);
     }
@@ -250,31 +267,71 @@ public class ZeppelinClient {
     return (session != null) && (session.isOpen());
   }
 
-  private Message zeppelinGetNoteMsg(String noteId) {
+  private Message zeppelinGetNoteMsg(String noteId, String principal, String ticket) {
     Message getNoteMsg = new Message(Message.OP.GET_NOTE);
     HashMap<String, Object> data = new HashMap<String, Object>();
     data.put("id", noteId);
     getNoteMsg.data = data;
+    getNoteMsg.principal = principal;
+    getNoteMsg.ticket = ticket;
     return getNoteMsg;
   }
 
   public void handleMsgFromZeppelin(String message, String noteId) {
     Map<String, String> meta = new HashMap<>();
-    meta.put("token", zeppelinhubToken);
+    //TODO(khalid): don't use zeppelinhubToken in this class, decouple
     meta.put("noteId", noteId);
     Message zeppelinMsg = deserialize(message);
     if (zeppelinMsg == null) {
       return;
     }
-    ZeppelinhubMessage hubMsg = ZeppelinhubMessage.newMessage(zeppelinMsg, meta);
+    String token;
+    if (!isActionable(zeppelinMsg.op)) {
+      return;
+    }
+    
+    token = UserTokenContainer.getInstance().getUserToken(zeppelinMsg.principal);
     Client client = Client.getInstance();
     if (client == null) {
       LOG.warn("Client isn't initialized yet");
       return;
     }
-    client.relayToZeppelinHub(hubMsg.serialize());
+    ZeppelinhubMessage hubMsg = ZeppelinhubMessage.newMessage(zeppelinMsg, meta);
+    if (StringUtils.isEmpty(token)) {
+      relayToAllZeppelinHub(hubMsg, noteId);
+    } else {
+      client.relayToZeppelinHub(hubMsg.serialize(), token);
+    }
+
+  }
+
+  private void relayToAllZeppelinHub(ZeppelinhubMessage hubMsg, String noteId) {
+    if (StringUtils.isBlank(noteId)) {
+      return;
+    }
+    NotebookAuthorization noteAuth = NotebookAuthorization.getInstance();
+    Map<String, String> userTokens = UserTokenContainer.getInstance().getAllUserTokens();
+    Client client = Client.getInstance();
+    Set<String> userAndRoles;
+    String token;
+    for (String user: userTokens.keySet()) {
+      userAndRoles = noteAuth.getRoles(user);
+      userAndRoles.add(user);
+      if (noteAuth.isReader(noteId, userAndRoles)) {
+        token = userTokens.get(user);
+        hubMsg.meta.put("token", token);
+        client.relayToZeppelinHub(hubMsg.serialize(), token);
+      }
+    }
   }
 
+  private boolean isActionable(OP action) {
+    if (action == null) {
+      return false;
+    }
+    return actionable.contains(action.name());
+  }
+  
   public void removeNoteConnection(String noteId) {
     if (StringUtils.isBlank(noteId)) {
       LOG.error("Cannot remove session for empty noteId");
@@ -307,7 +364,7 @@ public class ZeppelinClient {
 
   public void ping() {
     if (watcherSession == null) {
-      LOG.info("Cannot send PING event, no watcher found");
+      LOG.debug("Cannot send PING event, no watcher found");
       return;
     }
     watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING)));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
index b7a87ad..f692938 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.HttpCookie;
 import java.net.URI;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -36,6 +37,7 @@ import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session.Zeppelinh
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.apache.zeppelin.ticket.TicketContainer;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
@@ -58,7 +60,6 @@ public class ZeppelinhubClient {
 
   private final WebSocketClient client;
   private final URI zeppelinhubWebsocketUrl;
-  private final ClientUpgradeRequest conectionRequest;
   private final String zeppelinhubToken;
 
   private static final long CONNECTION_IDLE_TIME = TimeUnit.SECONDS.toMillis(30);
@@ -66,7 +67,8 @@ public class ZeppelinhubClient {
   private static Gson gson;
   
   private SchedulerService schedulerService;
-  private ZeppelinhubSession zeppelinhubSession;
+  private Map<String, ZeppelinhubSession> sessionMap = 
+      new ConcurrentHashMap<String, ZeppelinhubSession>();
 
   public static ZeppelinhubClient initialize(String zeppelinhubUrl, String token) {
     if (instance == null) {
@@ -82,7 +84,6 @@ public class ZeppelinhubClient {
   private ZeppelinhubClient(String url, String token) {
     zeppelinhubWebsocketUrl = URI.create(url);
     client = createNewWebsocketClient();
-    conectionRequest = setConnectionrequest(token);
     zeppelinhubToken = token;
     schedulerService = SchedulerService.create(10);
     gson = new Gson();
@@ -92,17 +93,19 @@ public class ZeppelinhubClient {
   public void start() {
     try {
       client.start();
-      zeppelinhubSession = connect();
       addRoutines();
     } catch (Exception e) {
       LOG.error("Cannot connect to zeppelinhub via websocket", e);
     }
   }
   
+  public void initUser(String token) {
+    
+  }
+
   public void stop() {
     LOG.info("Stopping Zeppelinhub websocket client");
     try {
-      zeppelinhubSession.close();
       schedulerService.close();
       client.stop();
     } catch (Exception e) {
@@ -110,14 +113,19 @@ public class ZeppelinhubClient {
     }
   }
 
+  public void stopUser(String token) {
+    removeSession(token);
+  }
+
   public String getToken() {
     return this.zeppelinhubToken;
   }
   
-  public void send(String msg) {
-    if (!isConnectedToZeppelinhub()) {
+  public void send(String msg, String token) {
+    ZeppelinhubSession zeppelinhubSession = getSession(token);
+    if (!isConnectedToZeppelinhub(zeppelinhubSession)) {
       LOG.info("Zeppelinhub connection is not open, opening it");
-      zeppelinhubSession = connect();
+      zeppelinhubSession = connect(token);
       if (zeppelinhubSession == ZeppelinhubSession.EMPTY) {
         LOG.warn("While connecting to ZeppelinHub received empty session, cannot send the message");
         return;
@@ -126,25 +134,48 @@ public class ZeppelinhubClient {
     zeppelinhubSession.sendByFuture(msg);
   }
   
-  private boolean isConnectedToZeppelinhub() {
+  private boolean isConnectedToZeppelinhub(ZeppelinhubSession zeppelinhubSession) {
     return (zeppelinhubSession != null && zeppelinhubSession.isSessionOpen());
   }
 
-  private ZeppelinhubSession connect() {
-    ZeppelinhubSession zeppelinSession;
+  private ZeppelinhubSession connect(String token) {
+    if (StringUtils.isBlank(token)) {
+      LOG.debug("Can't connect with empty token");
+      return ZeppelinhubSession.EMPTY;
+    }
+    ZeppelinhubSession zeppelinhubSession;
     try {
-      ZeppelinhubWebsocket ws = ZeppelinhubWebsocket.newInstance(zeppelinhubToken);
-      Future<Session> future = client.connect(ws, zeppelinhubWebsocketUrl, conectionRequest);
+      ZeppelinhubWebsocket ws = ZeppelinhubWebsocket.newInstance(token);
+      ClientUpgradeRequest request = getConnectionRequest(token);
+      Future<Session> future = client.connect(ws, zeppelinhubWebsocketUrl, request);
       Session session = future.get();
-      zeppelinSession = ZeppelinhubSession.createInstance(session, zeppelinhubToken);
+      zeppelinhubSession = ZeppelinhubSession.createInstance(session, token);
+      setSession(token, zeppelinhubSession);
     } catch (IOException | InterruptedException | ExecutionException e) {
-      LOG.info("Couldnt connect to zeppelinhub - {}", e.toString());
-      zeppelinSession = ZeppelinhubSession.EMPTY;
+      LOG.info("Couldnt connect to zeppelinhub", e);
+      zeppelinhubSession = ZeppelinhubSession.EMPTY;
     }
-    return zeppelinSession;
+    return zeppelinhubSession;
   }
-  
-  private ClientUpgradeRequest setConnectionrequest(String token) {
+
+  private void setSession(String token, ZeppelinhubSession session) {
+    sessionMap.put(token, session);
+  }
+
+  private ZeppelinhubSession getSession(String token) {
+    return sessionMap.get(token);
+  }
+
+  public void removeSession(String token) {
+    ZeppelinhubSession zeppelinhubSession = getSession(token);
+    if (zeppelinhubSession == null) {
+      return;
+    }
+    zeppelinhubSession.close();
+    sessionMap.remove(token);
+  }
+
+  private ClientUpgradeRequest getConnectionRequest(String token) {
     ClientUpgradeRequest request = new ClientUpgradeRequest();
     request.setCookies(Lists.newArrayList(new HttpCookie(ZeppelinHubRepo.TOKEN_HEADER, token)));
     return request;
@@ -193,7 +224,7 @@ public class ZeppelinhubClient {
           runAllParagraph(hubMsg.meta.get("noteId"), msg);
           break;
         default:
-          LOG.warn("Received {} from ZeppelinHub, not handled", op);
+          LOG.debug("Received {} from ZeppelinHub, not handled", op);
           break;
     }
   }
@@ -206,6 +237,8 @@ public class ZeppelinhubClient {
       return;
     }
     zeppelinMsg.data = (Map<String, Object>) hubMsg.data;
+    zeppelinMsg.principal = hubMsg.meta.get("owner");
+    zeppelinMsg.ticket = TicketContainer.instance.getTicket(zeppelinMsg.principal);
     Client client = Client.getInstance();
     if (client == null) {
       LOG.warn("Base client isn't initialized, returning");
@@ -230,6 +263,7 @@ public class ZeppelinhubClient {
       Message zeppelinMsg = new Message(OP.RUN_PARAGRAPH);
 
       JSONArray paragraphs = data.getJSONArray("data");
+      String principal = data.getJSONObject("meta").getString("owner");
       for (int i = 0; i < paragraphs.length(); i++) {
         if (!(paragraphs.get(i) instanceof JSONObject)) {
           LOG.warn("Wrong \"paragraph\" format for RUN_NOTEBOOK");
@@ -237,6 +271,8 @@ public class ZeppelinhubClient {
         }
         zeppelinMsg.data = gson.fromJson(paragraphs.getString(i), 
             new TypeToken<Map<String, Object>>(){}.getType());
+        zeppelinMsg.principal = principal;
+        zeppelinMsg.ticket = TicketContainer.instance.getTicket(principal);
         client.relayToZeppelin(zeppelinMsg, noteId);
         LOG.info("\nSending RUN_PARAGRAPH message to Zeppelin ");
       }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
index 5ccacb9..95a1e77 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
@@ -21,6 +21,7 @@ import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.notebook.socket.WatcherMessage;
+import org.apache.zeppelin.ticket.TicketContainer;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketListener;
 import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import com.google.gson.Gson;
 public class WatcherWebsocket implements WebSocketListener {
   private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class);
   private static final Gson GSON = new Gson();
+  private static final String watcherPrincipal = "watcher";
   public Session connection;
   
   public static WatcherWebsocket createInstace() {
@@ -54,7 +56,10 @@ public class WatcherWebsocket implements WebSocketListener {
   public void onWebSocketConnect(Session session) {
     LOG.info("WatcherWebsocket connection opened");
     this.connection = session;
-    session.getRemote().sendStringByFuture(GSON.toJson(new Message(OP.WATCHER)));
+    Message watcherMsg = new Message(OP.WATCHER);
+    watcherMsg.principal = watcherPrincipal;
+    watcherMsg.ticket = TicketContainer.instance.getTicket(watcherPrincipal);
+    session.getRemote().sendStringByFuture(GSON.toJson(watcherMsg));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java
index f982365..2282147 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java
@@ -16,6 +16,7 @@
  */
 package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
 
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserTokenContainer;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
 import org.slf4j.Logger;
@@ -39,7 +40,9 @@ public class ZeppelinHubHeartbeat implements Runnable {
   
   @Override
   public void run() {
-    LOG.debug("Sending PING to zeppelinhub");
-    client.send(ZeppelinhubUtils.pingMessage(client.getToken()));
+    LOG.debug("Sending PING to zeppelinhub token");
+    for (String token: UserTokenContainer.getInstance().getAllTokens()) {
+      client.send(ZeppelinhubUtils.pingMessage(token), token);
+    }
   }  
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java
index bc5e2dd..b88fd72 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java
@@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils;
 import java.util.HashMap;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserTokenContainer;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
 import org.apache.zeppelin.notebook.socket.Message;
@@ -95,4 +97,32 @@ public class ZeppelinhubUtils {
   public static boolean isZeppelinOp(String text) {
     return (toZeppelinOp(text) != null); 
   }
+  
+  public static void userLoginRoutine(String username) {
+    LOG.debug("Executing user login routine");
+    String token = UserTokenContainer.getInstance().getUserToken(username);
+    UserTokenContainer.getInstance().setUserToken(username, token);
+    String msg = ZeppelinhubUtils.liveMessage(token);
+    ZeppelinhubClient.getInstance()
+        .send(msg, token);
+  }
+  
+  public static void userLogoutRoutine(String username) {
+    LOG.debug("Executing user logout routine");
+    String token = UserTokenContainer.getInstance().removeUserToken(username);
+    String msg = ZeppelinhubUtils.deadMessage(token);
+    ZeppelinhubClient.getInstance()
+        .send(msg, token);
+    ZeppelinhubClient.getInstance().removeSession(token);
+  }
+  
+  public static void userSwitchTokenRoutine(String username, String originToken,
+      String targetToken) {
+    String offMsg = ZeppelinhubUtils.deadMessage(originToken);
+    ZeppelinhubClient.getInstance().send(offMsg, originToken);
+    ZeppelinhubClient.getInstance().removeSession(originToken);
+    
+    String onMsg = ZeppelinhubUtils.liveMessage(targetToken);
+    ZeppelinhubClient.getInstance().send(onMsg, targetToken);
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/main/java/org/apache/zeppelin/ticket/TicketContainer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/ticket/TicketContainer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/ticket/TicketContainer.java
new file mode 100644
index 0000000..513bb4a
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/ticket/TicketContainer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ticket;
+
+import java.util.Calendar;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Very simple ticket container
+ * No cleanup is done, since the same user accross different devices share the same ticket
+ * The Map size is at most the number of different user names having access to a Zeppelin instance
+ */
+
+
+public class TicketContainer {
+  private static class Entry {
+    public final String ticket;
+    // lastAccessTime still unused
+    public final long lastAccessTime;
+
+    Entry(String ticket) {
+      this.ticket = ticket;
+      this.lastAccessTime = Calendar.getInstance().getTimeInMillis();
+    }
+  }
+
+  private Map<String, Entry> sessions = new ConcurrentHashMap<>();
+
+  public static final TicketContainer instance = new TicketContainer();
+
+  /**
+   * For test use
+   * @param principal
+   * @param ticket
+   * @return true if ticket assigned to principal.
+   */
+  public boolean isValid(String principal, String ticket) {
+    if ("anonymous".equals(principal) && "anonymous".equals(ticket))
+      return true;
+    Entry entry = sessions.get(principal);
+    return entry != null && entry.ticket.equals(ticket);
+  }
+
+  /**
+   * get or create ticket for Websocket authentication assigned to authenticated shiro user
+   * For unathenticated user (anonymous), always return ticket value "anonymous"
+   * @param principal
+   * @return
+   */
+  public synchronized String getTicket(String principal) {
+    Entry entry = sessions.get(principal);
+    String ticket;
+    if (entry == null) {
+      if (principal.equals("anonymous"))
+        ticket = "anonymous";
+      else
+        ticket = UUID.randomUUID().toString();
+    } else {
+      ticket = entry.ticket;
+    }
+    entry = new Entry(ticket);
+    sessions.put(principal, entry);
+    return ticket;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7ad6108/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
index 746e775..b8e52e4 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
@@ -62,25 +62,25 @@ public class ZeppelinClientTest {
     LOG.info("Zeppelin websocket client started");
 
     // Connection to note AAAA
-    Session connectionA = client.getZeppelinConnection("AAAA");
+    Session connectionA = client.getZeppelinConnection("AAAA", "anonymous", "anonymous");
     assertNotNull(connectionA);
     assertTrue(connectionA.isOpen());
 
     assertEquals(client.countConnectedNotes(), 1);
-    assertEquals(connectionA, client.getZeppelinConnection("AAAA"));
+    assertEquals(connectionA, client.getZeppelinConnection("AAAA", "anonymous", "anonymous"));
 
     // Connection to note BBBB
-    Session connectionB = client.getZeppelinConnection("BBBB");
+    Session connectionB = client.getZeppelinConnection("BBBB", "anonymous", "anonymous");
     assertNotNull(connectionB);
     assertTrue(connectionB.isOpen());
 
     assertEquals(client.countConnectedNotes(), 2);
-    assertEquals(connectionB, client.getZeppelinConnection("BBBB"));
+    assertEquals(connectionB, client.getZeppelinConnection("BBBB", "anonymous", "anonymous"));
 
     // Remove connection to note AAAA
     client.removeNoteConnection("AAAA");
     assertEquals(client.countConnectedNotes(), 1);
-    assertNotEquals(connectionA, client.getZeppelinConnection("AAAA"));
+    assertNotEquals(connectionA, client.getZeppelinConnection("AAAA", "anonymous", "anonymous"));
     assertEquals(client.countConnectedNotes(), 2);
     client.stop();
   }


Mime
View raw message