apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pra...@apache.org
Subject [1/3] apex-core git commit: APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers
Date Tue, 13 Dec 2016 00:04:06 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master cc79b0c02 -> a9e4e053b


APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d28c0ddd
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d28c0ddd
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d28c0ddd

Branch: refs/heads/master
Commit: d28c0ddd9d259855c1d9ba623bfb970342bc40c4
Parents: c97dd7c
Author: Vlad Rozov <v.rozov@datatorrent.com>
Authored: Fri Dec 2 19:08:49 2016 -0800
Committer: Vlad Rozov <v.rozov@datatorrent.com>
Committed: Fri Dec 2 19:08:49 2016 -0800

----------------------------------------------------------------------
 .../datatorrent/bufferserver/server/Server.java | 259 ++++++++-----------
 1 file changed, 109 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/d28c0ddd/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 12eed5f..e720248 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -113,14 +113,27 @@ public class Server implements ServerListener
   @Override
   public void unregistered(SelectionKey key)
   {
-    serverHelperExecutor.shutdown();
-    storageHelperExecutor.shutdown();
-    try {
-      serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException ex) {
-      logger.debug("Executor Termination", ex);
+    for (LogicalNode ln : subscriberGroups.values()) {
+      ln.boot(eventloop);
     }
-    logger.info("Server stopped listening at {}", address);
+    /*
+     * There may be unregister tasks scheduled to run on the event loop that use serverHelperExecutor.
+     */
+    eventloop.submit(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        serverHelperExecutor.shutdown();
+        storageHelperExecutor.shutdown();
+        try {
+          serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ex) {
+          logger.debug("Executor Termination", ex);
+        }
+        logger.info("Server stopped listening at {}", address);
+      }
+    });
   }
 
   public synchronized InetSocketAddress run(EventLoop eventloop)
@@ -165,13 +178,12 @@ public class Server implements ServerListener
   @Override
   public String toString()
   {
-    return identity;
+    return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{address="
+ address + "}";
   }
 
   private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1,
0.75f, 1);
   private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String,
LogicalNode>();
   private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels
= new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels
= new ConcurrentHashMap<>();
   private final int blockSize;
   private final int numberOfCacheBlocks;
 
@@ -235,81 +247,70 @@ public class Server implements ServerListener
   /**
    *
    * @param request
-   * @param connection
-   * @return
+   * @param key
    */
-  public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request,
-      final AbstractLengthPrependerClient connection)
+  public void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey
key)
   {
-    String identifier = request.getIdentifier();
-    String type = request.getStreamType();
-    String upstream_identifier = request.getUpstreamIdentifier();
-
-    // Check if there is a logical node of this type, if not create it.
-    final LogicalNode ln;
-    if (subscriberGroups.containsKey(type)) {
-      //logger.debug("adding to exiting group = {}", subscriberGroups.get(type));
-      /*
-       * close previous connection with the same identifier which is guaranteed to be unique.
-       */
-      AbstractLengthPrependerClient previous = subscriberChannels.put(identifier, connection);
-      if (previous != null) {
-        eventloop.disconnect(previous);
-      }
-
-      ln = subscriberGroups.get(type);
+    try {
       serverHelperExecutor.submit(new Runnable()
       {
         @Override
         public void run()
         {
-          ln.boot(eventloop);
-          ln.addConnection(connection);
-          ln.catchUp();
-        }
-      });
-    } else {
-      /*
-       * if there is already a datalist registered for the type in which this client is interested,
-       * then get a iterator on the data items of that data list. If the datalist is not
registered,
-       * then create one and register it. Hopefully this one would be used by future upstream
nodes.
-       */
-      final DataList dl;
-      if (publisherBuffers.containsKey(upstream_identifier)) {
-        dl = publisherBuffers.get(upstream_identifier);
-        //logger.debug("old list = {}", dl);
-      } else {
-        dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
-            new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) :
-            new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
-        publisherBuffers.put(upstream_identifier, dl);
-        //logger.debug("new list = {}", dl);
-      }
+          final String upstream_identifier = request.getUpstreamIdentifier();
+
+        /*
+         * if there is already a datalist registered for the type in which this client is
interested,
+         * then get a iterator on the data items of that data list. If the datalist is not
registered,
+         * then create one and register it. Hopefully this one would be used by future upstream
nodes.
+         */
+          DataList dl = publisherBuffers.get(upstream_identifier);
+          if (dl == null) {
+            dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
+                new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) :
+                new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
+            DataList odl = publisherBuffers.putIfAbsent(upstream_identifier, dl);
+            if (odl != null) {
+              dl = odl;
+            }
+          }
 
-      long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId();
-      ln = new LogicalNode(identifier, upstream_identifier, type, dl.newIterator(skipWindowId),
skipWindowId);
+          final String identifier = request.getIdentifier();
+          final String type = request.getStreamType();
+          final long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId();
+          final LogicalNode ln = new LogicalNode(identifier, upstream_identifier, type, dl
+              .newIterator(skipWindowId), skipWindowId);
 
-      int mask = request.getMask();
-      if (mask != 0) {
-        for (Integer bs : request.getPartitions()) {
-          ln.addPartition(bs, mask);
-        }
-      }
+          int mask = request.getMask();
+          if (mask != 0) {
+            for (Integer bs : request.getPartitions()) {
+              ln.addPartition(bs, mask);
+            }
+          }
+          final LogicalNode oln = subscriberGroups.put(type, ln);
+          if (oln != null) {
+            oln.boot(eventloop);
+          }
+          AbstractLengthPrependerClient subscriber = new Subscriber(ln, request.getBufferSize());
+
+          subscriber.registered(key);
+          key.attach(subscriber);
+          key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
 
-      subscriberGroups.put(type, ln);
-      serverHelperExecutor.submit(new Runnable()
-      {
-        @Override
-        public void run()
-        {
-          ln.addConnection(connection);
           ln.catchUp();
           dl.addDataListener(ln);
         }
       });
+    } catch (RejectedExecutionException e) {
+      logger.error("Received subscriber request {} after server {} termination. Disconnecting
{}", request, this, key.channel(), e);
+      if (key.isValid()) {
+        try {
+          key.channel().close();
+        } catch (IOException ioe) {
+          logger.error("Failed to close channel {}", key.channel(), ioe);
+        }
+      }
     }
-
-    return ln;
   }
 
   /**
@@ -322,9 +323,9 @@ public class Server implements ServerListener
   {
     String identifier = request.getIdentifier();
 
-    DataList dl;
+    DataList dl = publisherBuffers.get(identifier);
 
-    if (publisherBuffers.containsKey(identifier)) {
+    if (dl != null) {
       /*
        * close previous connection with the same identifier which is guaranteed to be unique.
        */
@@ -333,7 +334,6 @@ public class Server implements ServerListener
         eventloop.disconnect(previous);
       }
 
-      dl = publisherBuffers.get(identifier);
       try {
         dl.rewind(request.getBaseSeconds(), request.getWindowId());
       } catch (IOException ie) {
@@ -343,7 +343,10 @@ public class Server implements ServerListener
       dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
           new FastDataList(identifier, blockSize, numberOfCacheBlocks) :
           new DataList(identifier, blockSize, numberOfCacheBlocks);
-      publisherBuffers.put(identifier, dl);
+      DataList odl = publisherBuffers.putIfAbsent(identifier, dl);
+      if (odl != null) {
+        dl = odl;
+      }
     }
     dl.setSecondaryStorage(storage, storageHelperExecutor);
 
@@ -468,39 +471,7 @@ public class Server implements ServerListener
           ignore = true;
           logger.info("Received subscriber request: {}", request);
 
-          SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request;
-          AbstractLengthPrependerClient subscriber;
-
-//          /* for backward compatibility - set the buffer size to 16k - EXPERIMENTAL */
-          int bufferSize = subscriberRequest.getBufferSize();
-//          if (bufferSize == 0) {
-//            bufferSize = 16 * 1024;
-//          }
-          if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) {
-            subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
-                subscriberRequest.getPartitions(), bufferSize);
-          } else {
-            subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
-                subscriberRequest.getPartitions(), bufferSize)
-            {
-              @Override
-              public int readSize()
-              {
-                if (writeOffset - readOffset < 2) {
-                  return -1;
-                }
-
-                short s = buffer[readOffset++];
-                return s | (buffer[readOffset++] << 8);
-              }
-
-            };
-          }
-          key.attach(subscriber);
-          key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
-          subscriber.registered(key);
-
-          handleSubscriberRequest(subscriberRequest, subscriber);
+          handleSubscriberRequest((SubscribeRequestTuple)request, key);
           break;
 
         case PURGE_REQUEST:
@@ -530,16 +501,13 @@ public class Server implements ServerListener
 
   class Subscriber extends AbstractLengthPrependerClient
   {
-    private final String type;
-    private final int mask;
-    private final int[] partitions;
+    private LogicalNode ln;
 
-    Subscriber(String type, int mask, int[] partitions, int bufferSize)
+    Subscriber(LogicalNode ln, int bufferSize)
     {
       super(1024, bufferSize);
-      this.type = type;
-      this.mask = mask;
-      this.partitions = partitions;
+      this.ln = ln;
+      ln.addConnection(this);
       super.write = false;
     }
 
@@ -553,58 +521,49 @@ public class Server implements ServerListener
     @Override
     public void unregistered(final SelectionKey key)
     {
-      super.unregistered(key);
-      teardown();
-    }
-
-    @Override
-    public void handleException(Exception cce, EventLoop el)
-    {
-      teardown();
-      super.handleException(cce, el);
-    }
+      try {
+        serverHelperExecutor.submit(new Runnable()
+        {
+          @Override
+          public void run()
+          {
+            teardown();
+          }
 
-    @Override
-    public String toString()
-    {
-      return "Server.Subscriber{" + "type=" + type + ", mask=" + mask +
-          ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) +
'}';
+          @Override
+          public String toString()
+          {
+            return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) +
+                " teardown " + Subscriber.this;
+          }
+        });
+      } catch (Exception e) {
+        logger.error("{}", this, e);
+      }
+      super.unregistered(key);
     }
 
-    private volatile boolean torndown;
-
     private void teardown()
     {
-      //logger.debug("Teardown is being called {}", torndown, new Exception());
-      if (torndown) {
-        return;
-      }
-      torndown = true;
-
-      LogicalNode ln = subscriberGroups.get(type);
       if (ln != null) {
-        if (subscriberChannels.containsValue(this)) {
-          final Iterator<Entry<String, AbstractLengthPrependerClient>> i = subscriberChannels.entrySet().iterator();
-          while (i.hasNext()) {
-            if (i.next().getValue() == this) {
-              i.remove();
-              break;
-            }
-          }
-        }
-
-        ln.removeChannel(this);
+        ln.removeChannel(Subscriber.this);
         if (ln.getPhysicalNodeCount() == 0) {
           DataList dl = publisherBuffers.get(ln.getUpstream());
           if (dl != null) {
             dl.removeDataListener(ln);
           }
-          subscriberGroups.remove(ln.getGroup());
+          subscriberGroups.remove(ln.getGroup(), ln);
+          ln.getIterator().close();
+          ln = null;
         }
-        ln.getIterator().close();
       }
     }
 
+    @Override
+    public String toString()
+    {
+      return "Server.Subscriber{" + "ln=" + ln + "}";
+    }
   }
 
   /**


Mime
View raw message