apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [10/11] incubator-apex-core git commit: APEX-26 Generalized PubSubWebsocketServlet for use in Apex and dependent projects.
Date Fri, 28 Aug 2015 22:02:26 GMT
APEX-26 Generalized PubSubWebsocketServlet for use in Apex and dependent projects.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/aceaeebe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/aceaeebe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/aceaeebe

Branch: refs/heads/devel-3
Commit: aceaeebe2eda27337bc2dc38f71c11156f983bdb
Parents: 7488444
Author: Timothy Farkas <tim@datatorrent.com>
Authored: Mon Aug 3 10:29:22 2015 -0700
Committer: David Yan <david@datatorrent.com>
Committed: Fri Aug 28 14:19:42 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java                     | 437 -------------------
 engine/pom.xml                                  |   1 -
 .../stram/util/PubSubWebSocketServlet.java      | 376 ++++++++++++++++
 3 files changed, 376 insertions(+), 438 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
deleted file mode 100644
index b039f44..0000000
--- a/PubSubWebSocketServlet.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- *  Copyright (c) 2012-2013 DataTorrent, Inc.
- *  All Rights Reserved.
- */
-package com.datatorrent.gateway;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import javax.servlet.FilterChain;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.codehaus.jackson.map.ObjectMapper;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.eclipse.jetty.websocket.WebSocketServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.common.util.PubSubMessage;
-import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
-import com.datatorrent.common.util.PubSubMessageCodec;
-
-import com.datatorrent.gateway.security.AuthDatabase;
-import com.datatorrent.gateway.security.AuthenticationException;
-import com.datatorrent.gateway.security.DTPrincipal;
-import com.datatorrent.stram.util.JSONSerializationProvider;
-import com.datatorrent.stram.util.LRUCache;
-
-
-/**
- * <p>PubSubWebSocketServlet class.</p>
- *
- * @author David Yan <david@datatorrent.com>
- * @since 0.3.2
- */
-public class PubSubWebSocketServlet extends WebSocketServlet
-{
-  private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
-  private static final long serialVersionUID = 1L;
-  private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String,
HashSet<PubSubWebSocket>>();
-  private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket,
HashSet<String>>();
-  private ObjectMapper mapper = (new JSONSerializationProvider()).getContext(null);
-  private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
-  private InternalMessageHandler internalMessageHandler = null;
-  private static final int latestTopicCount = 100;
-  private final DTGateway gateway;
-  private static final String AUTH_ATTRIBUTE = "com.datatorrent.auth.principal";
-  private SubscribeFilter subscribeFilter;
-  private SendFilter sendFilter;
-  private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount,
false)
-  {
-    private static final long serialVersionUID = 20140131L;
-
-    @Override
-    public Long put(String key, Long value)
-    {
-      remove(key); // this is to make the key the most recently inserted entry
-      return super.put(key, value);
-    }
-
-  };
-
-  public interface SubscribeFilter
-  {
-
-    /**
-     * Returns whether or not the principal is allowed to subscribe to this topic
-     *
-     * @param gateway
-     * @param principal
-     * @param topic
-     * @return
-     */
-    boolean filter(DTGateway gateway, DTPrincipal principal, String topic);
-  }
-
-  public interface SendFilter
-  {
-
-    /**
-     * Returns the data it should be sent given the principal
-     *
-     * @param gateway
-     * @param principal
-     * @param topic
-     * @param data
-     * @return the data it should send to the websocket
-     */
-    Object filter(DTGateway gateway, DTPrincipal principal, String topic, Object data);
-  }
-
-  public void registerSubscribeFilter(SubscribeFilter filter)
-  {
-    subscribeFilter = filter;
-  }
-
-  public void registerSendFilter(SendFilter filter)
-  {
-    sendFilter = filter;
-  }
-
-  public interface InternalMessageHandler
-  {
-    void onMessage(String topic, Object data);
-
-  }
-
-  public PubSubWebSocketServlet(DTGateway gateway)
-  {
-    this.gateway = gateway;
-  }
-
-  public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
-  {
-    this.internalMessageHandler = internalMessageHandler;
-  }
-
-  @Override
-  protected void service(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException
-  {
-    boolean handled = false;
-    DTPrincipal principal = null;
-    AuthDatabase auth = gateway.getAuthDatabase();
-    if (gateway.isDTSessionHandled()) {
-      //if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
-        Cookie[] cookies = request.getCookies();
-        if (cookies != null) {
-          for (Cookie cookie : cookies) {
-            if ("session".equals(cookie.getName())) {
-              try {
-                principal = auth.authenticateSession(cookie.getValue());
-                //request.setAttribute(AUTH_ATTRIBUTE, principal);
-              } catch (AuthenticationException ex) {
-              /* commenting this out to allow anonymous publish from stram
-               throw new WebApplicationException(ex, Status.FORBIDDEN);
-               */
-              }
-              //super.service(request, response);
-            }
-          }
-        }
-      /* commenting this out to allow anonymous publish from stram
-       throw new WebApplicationException(Status.UNAUTHORIZED);
-       */
-      //}
-    } else if (gateway.isHadoopAuthFilterHandled()) {
-      final UserHolder userHolder = new UserHolder();
-      gateway.getHadoopAuthFilter().doFilter(request, response, new FilterChain()
-      {
-        @Override
-        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
throws IOException, ServletException
-        {
-          userHolder.username = ((HttpServletRequest)servletRequest).getUserPrincipal().getName();
-        }
-      });
-      if (response.getStatus() == HttpServletResponse.SC_OK) {
-        principal = auth.getUser(userHolder.username);
-      } else {
-        handled = true;
-      }
-    }
-    if (!handled) {
-      if (principal != null) {
-        request.setAttribute(AUTH_ATTRIBUTE, principal);
-      }
-      super.service(request, response);
-    }
-  }
-
-  private class UserHolder {
-    public String username;
-  }
-
-  @Override
-  public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
-  {
-    DTPrincipal principal = (DTPrincipal)request.getAttribute(AUTH_ATTRIBUTE);
-    return new PubSubWebSocket(principal);
-  }
-
-  private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
-  {
-    if (subscribeFilter != null && !subscribeFilter.filter(gateway, webSocket.getPrincipal(),
topic)) {
-      LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe
request", topic, webSocket.getPrincipal());
-      return;
-    }
-    else {
-      LOG.debug("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
-    }
-
-    HashSet<PubSubWebSocket> wsSet;
-    if (!topicToSocketMap.containsKey(topic)) {
-      wsSet = new HashSet<PubSubWebSocket>();
-      topicToSocketMap.put(topic, wsSet);
-    }
-    else {
-      wsSet = topicToSocketMap.get(topic);
-    }
-    wsSet.add(webSocket);
-
-    HashSet<String> topicSet;
-    if (!socketToTopicMap.containsKey(webSocket)) {
-      topicSet = new HashSet<String>(0);
-      socketToTopicMap.put(webSocket, topicSet);
-    }
-    else {
-      topicSet = socketToTopicMap.get(webSocket);
-    }
-    topicSet.add(topic);
-    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
-  }
-
-  private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
-  {
-    if (!topicToSocketMap.containsKey(topic)) {
-      return;
-    }
-    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
-    wsSet.remove(webSocket);
-    if (wsSet.isEmpty()) {
-      topicToSocketMap.remove(topic);
-    }
-    if (!socketToTopicMap.containsKey(webSocket)) {
-      return;
-    }
-    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
-    topicSet.remove(topic);
-    if (topicSet.isEmpty()) {
-      socketToTopicMap.remove(webSocket);
-    }
-    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
-  }
-
-  private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
-  {
-    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
-    if (topicSet != null) {
-      for (String topic : topicSet) {
-        HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
-        wsSet.remove(webSocket);
-        if (wsSet.isEmpty()) {
-          topicToSocketMap.remove(topic);
-        }
-        publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
-      }
-      socketToTopicMap.remove(webSocket);
-    }
-  }
-
-  private synchronized void disconnect(PubSubWebSocket webSocket)
-  {
-    unsubscribeAll(webSocket);
-  }
-
-  public synchronized int getNumSubscribers(String topic)
-  {
-    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
-    return wsSet == null ? 0 : wsSet.size();
-  }
-
-  private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data)
throws IOException
-  {
-    PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>();
-    pubSubMessage.setType(PubSubMessageType.DATA);
-    pubSubMessage.setTopic(topic);
-    pubSubMessage.setData(data);
-    LOG.debug("Sending data {} to subscriber...", topic);
-    webSocket.sendMessage(codec.formatMessage(pubSubMessage));
-  }
-
-  public synchronized void publish(String topic, Object data)
-  {
-    if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX
+ ".")) {
-      latestTopics.put(topic, System.currentTimeMillis());
-    }
-    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
-    if (wsSet != null) {
-      Iterator<PubSubWebSocket> it = wsSet.iterator();
-      while (it.hasNext()) {
-        PubSubWebSocket socket = it.next();
-        try {
-          if (sendFilter != null) {
-            Object filteredData = sendFilter.filter(gateway, socket.getPrincipal(), topic,
data);
-            sendData(socket, topic, filteredData);
-          }
-          else {
-            sendData(socket, topic, data);
-          }
-        }
-        catch (Exception ex) {
-          LOG.error("Cannot send message", ex);
-          it.remove();
-          disconnect(socket);
-        }
-      }
-    }
-  }
-
-  private class PubSubWebSocket implements WebSocket.OnTextMessage
-  {
-    private Connection connection;
-    private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
-    private final Thread messengerThread = new Thread(new Messenger());
-    private final DTPrincipal principal;
-
-    public PubSubWebSocket(DTPrincipal principal)
-    {
-      this.principal = principal;
-    }
-
-    public DTPrincipal getPrincipal()
-    {
-      return principal;
-    }
-
-    @Override
-    public void onMessage(String message)
-    {
-      LOG.debug("Received message {}", message);
-      try {
-        @SuppressWarnings("unchecked")
-        PubSubMessage<Object> pubSubMessage = codec.parseMessage(message);
-        if (pubSubMessage != null) {
-          PubSubMessageType type = pubSubMessage.getType();
-          String topic = pubSubMessage.getTopic();
-          if (type != null) {
-            if (type.equals(PubSubMessageType.SUBSCRIBE)) {
-              if (topic != null) {
-                subscribe(this, topic);
-              }
-            }
-            else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) {
-              if (topic != null) {
-                unsubscribe(this, topic);
-              }
-            }
-            else if (type.equals(PubSubMessageType.PUBLISH)) {
-              if (topic != null) {
-                Object data = pubSubMessage.getData();
-                if (data != null) {
-                  publish(topic, data);
-                }
-                if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
-                  if (internalMessageHandler != null) {
-                    internalMessageHandler.onMessage(topic, data);
-                  }
-                }
-              }
-            }
-            else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
-              if (topic != null) {
-                subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
-                sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
-              }
-            }
-            else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
-              if (topic != null) {
-                unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
-              }
-            }
-            else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
-              synchronized (this) {
-                sendData(this, "_latestTopics", latestTopics.keySet());
-              }
-            }
-          }
-        }
-      }
-      catch (Exception ex) {
-        LOG.warn("Exception caught", ex);
-      }
-    }
-
-    @Override
-    public void onOpen(Connection connection)
-    {
-      LOG.debug("onOpen");
-      this.connection = connection;
-      this.connection.setMaxIdleTime(5 * 60 * 1000); // idle time set to five minute to clear
out idle connections from taking resources
-      this.connection.setMaxTextMessageSize(8 * 1024 * 1024); // allow larger text message
-      messengerThread.start();
-    }
-
-    @Override
-    public void onClose(int i, String string)
-    {
-      LOG.debug("onClose");
-      disconnect(this);
-      messengerThread.interrupt();
-    }
-
-    public void sendMessage(String message) throws IllegalStateException
-    {
-      messageQueue.add(message);
-    }
-
-    /*
-     * This class exists only because Jetty 8 does not support async write for websocket
-     *
-     */
-    private class Messenger implements Runnable
-    {
-      @Override
-      public void run()
-      {
-        while (!Thread.interrupted()) {
-          try {
-            String message = messageQueue.take();
-            // This call sendMessage() is blocking. This is why we have this messenger thread
per connection so that one bad connection will not affect another
-            // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java
7
-            // When we can use Java 7, we need to upgrade to Jetty 9.
-            connection.sendMessage(message);
-          }
-          catch (InterruptedException ex) {
-            return;
-          }
-          catch (Exception ex) {
-            LOG.error("Caught exception in websocket messenger.", ex);
-            return;
-          }
-        }
-      }
-
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 21eaa00..2b8f66a 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -272,7 +272,6 @@
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-websocket</artifactId>
       <version>${jetty.version}</version>
-      <scope>test</scope>
     </dependency>
     <!-- use shaded asm library to avoid conflict -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
new file mode 100644
index 0000000..d8c3df8
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
@@ -0,0 +1,376 @@
+/*
+ *  Copyright (c) 2012-2013 DataTorrent, Inc.
+ *  All Rights Reserved.
+ */
+package com.datatorrent.stram.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocket.Connection;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.common.util.PubSubMessage;
+import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.common.util.PubSubMessageCodec;
+
+
+/**
+ * <p>PubSubWebSocketServlet class.</p>
+ *
+ * @author David Yan <david@datatorrent.com>
+ * @since 0.3.2
+ */
+public class PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL> extends WebSocketServlet
+{
+  private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
+  private static final long serialVersionUID = 1L;
+  private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String,
HashSet<PubSubWebSocket>>();
+  private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket,
HashSet<String>>();
+  private ObjectMapper mapper = (new JSONSerializationProvider()).getContext(null);
+  private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
+  private InternalMessageHandler internalMessageHandler = null;
+  private static final int latestTopicCount = 100;
+  protected SECURITY_CONTEXT securityContext;
+  private SubscribeFilter subscribeFilter;
+  private SendFilter sendFilter;
+  private String authAttribute;
+  private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount,
false)
+  {
+    private static final long serialVersionUID = 20140131L;
+
+    @Override
+    public Long put(String key, Long value)
+    {
+      remove(key); // this is to make the key the most recently inserted entry
+      return super.put(key, value);
+    }
+
+  };
+
+  public interface SubscribeFilter<SECURITY_CONTEXT, PRINCIPAL>
+  {
+
+    /**
+     * Returns whether or not the principal is allowed to subscribe to this topic
+     *
+     * @param securityContext
+     * @param principal
+     * @param topic
+     * @return
+     */
+    boolean filter(SECURITY_CONTEXT securityContext, PRINCIPAL principal, String topic);
+  }
+
+  public interface SendFilter<SECURITY_CONTEXT, PRINCIPAL>
+  {
+
+    /**
+     * Returns the data it should be sent given the principal
+     *
+     * @param securityContext
+     * @param principal
+     * @param topic
+     * @param data
+     * @return the data it should send to the websocket
+     */
+    Object filter(SECURITY_CONTEXT securityContext, PRINCIPAL principal, String topic, Object
data);
+  }
+
+  public void registerSubscribeFilter(SubscribeFilter filter)
+  {
+    subscribeFilter = filter;
+  }
+
+  public void registerSendFilter(SendFilter filter)
+  {
+    sendFilter = filter;
+  }
+
+  public interface InternalMessageHandler
+  {
+    void onMessage(String topic, Object data);
+
+  }
+
+  public PubSubWebSocketServlet(SECURITY_CONTEXT securityContext, String authAttribute)
+  {
+    this.securityContext = securityContext;
+    this.authAttribute = authAttribute;
+  }
+
+  public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
+  {
+    this.internalMessageHandler = internalMessageHandler;
+  }
+
+  public class UserHolder {
+    public String username;
+  }
+
+  @Override
+  public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
+  {
+    @SuppressWarnings("unchecked")
+    PRINCIPAL principal = (PRINCIPAL) request.getAttribute(authAttribute);
+    return new PubSubWebSocket(principal);
+  }
+
+  private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
+  {
+    if (subscribeFilter != null && !subscribeFilter.filter(securityContext, webSocket.getPrincipal(),
topic)) {
+      LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe
request", topic, webSocket.getPrincipal());
+      return;
+    }
+    else {
+      LOG.debug("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
+    }
+
+    HashSet<PubSubWebSocket> wsSet;
+    if (!topicToSocketMap.containsKey(topic)) {
+      wsSet = new HashSet<PubSubWebSocket>();
+      topicToSocketMap.put(topic, wsSet);
+    }
+    else {
+      wsSet = topicToSocketMap.get(topic);
+    }
+    wsSet.add(webSocket);
+
+    HashSet<String> topicSet;
+    if (!socketToTopicMap.containsKey(webSocket)) {
+      topicSet = new HashSet<String>(0);
+      socketToTopicMap.put(webSocket, topicSet);
+    }
+    else {
+      topicSet = socketToTopicMap.get(webSocket);
+    }
+    topicSet.add(topic);
+    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+  }
+
+  private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
+  {
+    if (!topicToSocketMap.containsKey(topic)) {
+      return;
+    }
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    wsSet.remove(webSocket);
+    if (wsSet.isEmpty()) {
+      topicToSocketMap.remove(topic);
+    }
+    if (!socketToTopicMap.containsKey(webSocket)) {
+      return;
+    }
+    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+    topicSet.remove(topic);
+    if (topicSet.isEmpty()) {
+      socketToTopicMap.remove(webSocket);
+    }
+    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+  }
+
+  private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
+  {
+    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+    if (topicSet != null) {
+      for (String topic : topicSet) {
+        HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+        wsSet.remove(webSocket);
+        if (wsSet.isEmpty()) {
+          topicToSocketMap.remove(topic);
+        }
+        publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+      }
+      socketToTopicMap.remove(webSocket);
+    }
+  }
+
+  private synchronized void disconnect(PubSubWebSocket webSocket)
+  {
+    unsubscribeAll(webSocket);
+  }
+
+  public synchronized int getNumSubscribers(String topic)
+  {
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    return wsSet == null ? 0 : wsSet.size();
+  }
+
+  private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data)
throws IOException
+  {
+    PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>();
+    pubSubMessage.setType(PubSubMessageType.DATA);
+    pubSubMessage.setTopic(topic);
+    pubSubMessage.setData(data);
+    LOG.debug("Sending data {} to subscriber...", topic);
+    webSocket.sendMessage(codec.formatMessage(pubSubMessage));
+  }
+
+  public synchronized void publish(String topic, Object data)
+  {
+    if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX
+ ".")) {
+      latestTopics.put(topic, System.currentTimeMillis());
+    }
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    if (wsSet != null) {
+      Iterator<PubSubWebSocket> it = wsSet.iterator();
+      while (it.hasNext()) {
+        PubSubWebSocket socket = it.next();
+        try {
+          if (sendFilter != null) {
+            Object filteredData = sendFilter.filter(securityContext, socket.getPrincipal(),
topic, data);
+            sendData(socket, topic, filteredData);
+          }
+          else {
+            sendData(socket, topic, data);
+          }
+        }
+        catch (Exception ex) {
+          LOG.error("Cannot send message", ex);
+          it.remove();
+          disconnect(socket);
+        }
+      }
+    }
+  }
+
+  protected class PubSubWebSocket implements WebSocket.OnTextMessage
+  {
+    private Connection connection;
+    private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
+    private final Thread messengerThread = new Thread(new Messenger());
+    private final PRINCIPAL principal;
+
+    public PubSubWebSocket(PRINCIPAL principal)
+    {
+      this.principal = principal;
+    }
+
+    public PRINCIPAL getPrincipal()
+    {
+      return principal;
+    }
+
+    @Override
+    public void onMessage(String message)
+    {
+      LOG.debug("Received message {}", message);
+      try {
+        @SuppressWarnings("unchecked")
+        PubSubMessage<Object> pubSubMessage = codec.parseMessage(message);
+        if (pubSubMessage != null) {
+          PubSubMessageType type = pubSubMessage.getType();
+          String topic = pubSubMessage.getTopic();
+          if (type != null) {
+            if (type.equals(PubSubMessageType.SUBSCRIBE)) {
+              if (topic != null) {
+                subscribe(this, topic);
+              }
+            }
+            else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) {
+              if (topic != null) {
+                unsubscribe(this, topic);
+              }
+            }
+            else if (type.equals(PubSubMessageType.PUBLISH)) {
+              if (topic != null) {
+                Object data = pubSubMessage.getData();
+                if (data != null) {
+                  publish(topic, data);
+                }
+                if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+                  if (internalMessageHandler != null) {
+                    internalMessageHandler.onMessage(topic, data);
+                  }
+                }
+              }
+            }
+            else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
+              if (topic != null) {
+                subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+                sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+              }
+            }
+            else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
+              if (topic != null) {
+                unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+              }
+            }
+            else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
+              synchronized (this) {
+                sendData(this, "_latestTopics", latestTopics.keySet());
+              }
+            }
+          }
+        }
+      }
+      catch (Exception ex) {
+        LOG.warn("Exception caught", ex);
+      }
+    }
+
+    @Override
+    public void onOpen(Connection connection)
+    {
+      LOG.debug("onOpen");
+      this.connection = connection;
+      this.connection.setMaxIdleTime(5 * 60 * 1000); // idle time set to five minute to clear
out idle connections from taking resources
+      this.connection.setMaxTextMessageSize(8 * 1024 * 1024); // allow larger text message
+      messengerThread.start();
+    }
+
+    @Override
+    public void onClose(int i, String string)
+    {
+      LOG.debug("onClose");
+      disconnect(this);
+      messengerThread.interrupt();
+    }
+
+    public void sendMessage(String message) throws IllegalStateException
+    {
+      messageQueue.add(message);
+    }
+
+    /*
+     * This class exists only because Jetty 8 does not support async write for websocket
+     *
+     */
+    private class Messenger implements Runnable
+    {
+      @Override
+      public void run()
+      {
+        while (!Thread.interrupted()) {
+          try {
+            String message = messageQueue.take();
+            // This call sendMessage() is blocking. This is why we have this messenger thread
per connection so that one bad connection will not affect another
+            // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java
7
+            // When we can use Java 7, we need to upgrade to Jetty 9.
+            connection.sendMessage(message);
+          }
+          catch (InterruptedException ex) {
+            return;
+          }
+          catch (Exception ex) {
+            LOG.error("Caught exception in websocket messenger.", ex);
+            return;
+          }
+        }
+      }
+
+    }
+
+  }
+
+}


Mime
View raw message