geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [09/10] geode git commit: GEODE-3416: Reduce synchronization blockages in SocketCloser. Remove synchronization blocks around HashMap. Replace that implementation with simpler ThreadPool that is not unbounded and does not grow as the number of remoteAddre
Date Fri, 11 Aug 2017 16:47:08 GMT
GEODE-3416: Reduce synchronization blockages in SocketCloser.
Remove synchronization blocks around HashMap. Replace that implementation
with simpler ThreadPool that is not unbounded and does not grow as the
number of remoteAddress (clients/peers) are added


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/31e4db0f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/31e4db0f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/31e4db0f

Branch: refs/heads/feature/GEODE-3416
Commit: 31e4db0f7630c4350519c29f2ce4061fc0c2b1ee
Parents: 7352fcc
Author: Dave Barnes <dbarnes@pivotal.io>
Authored: Thu Aug 10 17:11:50 2017 -0700
Committer: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
Committed: Fri Aug 11 09:46:15 2017 -0700

----------------------------------------------------------------------
 .../master_middleman/bookbinder_helpers.rb      | 298 -------------------
 .../cache/tier/sockets/CacheClientProxy.java    |  51 +---
 .../apache/geode/internal/net/SocketCloser.java | 176 ++++-------
 .../apache/geode/internal/tcp/Connection.java   |   4 +-
 .../geode/internal/tcp/ConnectionTable.java     |   4 -
 .../internal/net/SocketCloserJUnitTest.java     | 155 +++-------
 6 files changed, 115 insertions(+), 573 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-book/master_middleman/bookbinder_helpers.rb
----------------------------------------------------------------------
diff --git a/geode-book/master_middleman/bookbinder_helpers.rb b/geode-book/master_middleman/bookbinder_helpers.rb
deleted file mode 100644
index 817875c..0000000
--- a/geode-book/master_middleman/bookbinder_helpers.rb
+++ /dev/null
@@ -1,298 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-require 'bookbinder/code_example_reader'
-require 'bookbinder/ingest/cloner_factory'
-require 'bookbinder/ingest/git_accessor'
-require 'bookbinder/local_filesystem_accessor'
-require 'date'
-require_relative 'archive_drop_down_menu'
-require_relative 'quicklinks_renderer'
-
-I18n.enforce_available_locales = false
-
-module Bookbinder
-  class Helpers < ::Middleman::Extension
-    # class << self
-    #   def registered(app)
-    #     app.helpers HelperMethods
-    #   end
-
-    #   alias :included :registered
-    # end
-
-    module HelperMethods
-
-      def yield_for_code_snippet(from: nil, at: nil)
-        cloner_factory = Ingest::ClonerFactory.new({out: $stdout},
-                                                   LocalFilesystemAccessor.new,
-                                                   Ingest::GitAccessor.new)
-
-        cloner = cloner_factory.produce(config[:local_repo_dir])
-        code_example_reader = CodeExampleReader.new({out: $stdout},
-                                                    LocalFilesystemAccessor.new)
-        working_copy = cloner.call(source_repo_name: from,
-                                   source_ref: 'master',
-                                   destination_parent_dir: config[:workspace])
-
-        snippet, language = code_example_reader.get_snippet_and_language_at(at, working_copy)
-
-        delimiter = '```'
-
-        snippet.prepend("#{delimiter}#{language}\n").concat("\n#{delimiter}")
-      end
-
-      def elastic_search?
-        !!config[:elastic_search]
-      end
-
-      def yield_for_subnav
-        partial "subnavs/#{subnav_template_name}"
-      end
-
-      def yield_for_archive_drop_down_menu
-        menu = ArchiveDropDownMenu.new(
-          config[:archive_menu],
-          current_path: current_page.path
-        )
-        unless menu.empty?
-          partial 'archive_menus/default', locals: { menu_title: menu.title,
-                                                     dropdown_links: menu.dropdown_links
}
-        end
-      end
-
-      def exclude_feedback
-        current_page.add_metadata({page: {feedback_disabled: true}})
-      end
-
-      def yield_for_feedback
-        partial 'layouts/feedback' if config[:feedback_enabled] && !current_page.metadata[:page][:feedback_disabled]
-      end
-
-      def exclude_repo_link
-        current_page.add_metadata({page: {repo_link_disabled: true}})
-      end
-
-      def render_repo_link
-        if config[:repo_link_enabled] && repo_url && !current_page.metadata[:page][:repo_link_disabled]
-          "<a id='repo-link' href='#{repo_url}'>View the source for this page in GitHub</a>"
-        end
-      end
-
-      def mermaid_diagram(&blk)
-        escaped_text = capture(&blk).gsub('-','\-')
-
-        @_out_buf.concat "<div class='mermaid'>#{escaped_text}</div>"
-      end
-
-      def modified_date(default_date: nil)
-        parsed_default_date = Time.parse(default_date).utc if default_date
-
-        date = page_last_modified_date || parsed_default_date
-
-        "Page last updated: <span data-behavior=\"DisplayModifiedDate\" data-modified-date=\"#{date.to_i}000\"></span>"
if date
-      end
-
-      def breadcrumbs
-        page_chain = add_ancestors_of(current_page, [])
-        breadcrumbs = page_chain.map do |page|
-          make_breadcrumb(page, page == current_page)
-        end.compact
-        return if breadcrumbs.size < 2
-        return content_tag :ul, breadcrumbs.reverse.join(' '), class: 'breadcrumbs'
-      end
-
-      def vars
-        OpenStruct.new config[:template_variables]
-      end
-
-      ## Geode helpers (start)
-      def geode_product_name
-        current_page.data.title= vars.geode_product_name
-      end
-
-      def geode_product_name_long
-        current_page.data.title= vars.geode_product_name_long
-      end
-
-      def geode_product_version
-        current_page.data.title= vars.geode_product_version
-      end
-
-      def set_title(*args)
-        current_page.data.title= args.join(' ')
-      end
-      ## Geode helpers (end)
-
-      def product_info
-        config[:product_info].fetch(template_key, {})
-      end
-
-      def production_host
-        config[:production_host]
-      end
-
-      def quick_links
-        page_src = File.read(current_page.source_file)
-        quicklinks_renderer = QuicklinksRenderer.new(vars)
-        Redcarpet::Markdown.new(quicklinks_renderer).render(page_src)
-      end
-
-      def owners
-        html_resources = sitemap.resources.select { |r| r.path.end_with?('.html') }
-        html_resources.each.with_object({}) { |resource, owners|
-          owners[resource.path] = Array(resource.data['owner'])
-        }
-      end
-
-      def template_key
-        decreasingly_specific_namespaces.detect { |ns|
-          config[:subnav_templates].has_key?(ns)
-        }
-      end
-
-      def body_classes(path=current_path.dup, options={})
-        if path.is_a? Hash
-          options = path
-          path = current_path.dup
-        end
-        basename = File.basename(path)
-        dirname = File.dirname(path).gsub('.', '_')
-        page_classes(File.join(dirname, basename), options)
-      end
-
-      private
-
-      def subnav_template_name
-        config[:subnav_templates][template_key] || 'default'
-      end
-
-      def decreasingly_specific_namespaces
-        body_classes(numeric_prefix: numeric_class_prefix).
-          split(' ').reverse.drop(1).
-          map {|ns| ns.sub(/^#{numeric_class_prefix}/, '')}
-      end
-
-      def numeric_class_prefix
-        'NUMERIC_CLASS_PREFIX'
-      end
-
-      def page_last_modified_date
-        git_accessor = Ingest::GitAccessor.new
-
-        current_date = if current_page.data.dita
-          git_accessor.author_date(preprocessing_path(current_page.source_file), dita: true)
-        else
-          git_accessor.author_date(current_page.source_file)
-        end
-
-        current_date.utc if current_date
-      end
-
-      def repo_url
-        nested_dir, filename = parse_out_nested_dir_and_filename
-
-        repo_dir = match_repo_dir(nested_dir)
-        page_repo_config = config[:repo_links][repo_dir]
-
-        if page_repo_config && page_repo_config['ref']
-          org_repo = Pathname(page_repo_config['repo'])
-          ref = Pathname(page_repo_config['ref'])
-          at_path = at_path(page_repo_config)
-          nested_dir = extract_nested_directory(nested_dir, repo_dir)
-
-          "http://github.com/#{org_repo.join(Pathname('tree'), ref, Pathname(nested_dir),
at_path, source_file(filename))}"
-        end
-      end
-
-      def match_repo_dir(nested_dir)
-        config[:repo_links].keys
-          .select{ |key| nested_dir.match(/^#{key}/) }
-          .sort_by{ |key| key.length }
-          .last
-      end
-
-      def source_file(filename)
-        fs = LocalFilesystemAccessor.new
-
-        if current_page.data.dita
-          source_filename = "#{filename}.xml"
-
-          if fs.source_file_exists?(Pathname(preprocessing_path(current_page.source_file)).dirname,
-                                             source_filename)
-            source_filename
-          else
-            ''
-          end
-        else
-          "#{filename}.html.md.erb"
-        end
-      end
-
-      def preprocessing_path(current_source_path)
-        root_path, nested_repo_path = current_source_path.split('source')
-
-        root_path.gsub!('/output/master_middleman', '')
-
-        "#{root_path}output/preprocessing/sections#{nested_repo_path}"
-      end
-
-      def parse_out_nested_dir_and_filename
-        current_page.path
-          .match(/\/?(.*?)\/?([^\/]*)\.html$?/)
-          .captures
-      end
-
-      def extract_nested_directory(nested_dir, repo_dir)
-        nested_dir = nested_dir.gsub("#{repo_dir}", '')
-        nested_dir = nested_dir.sub('/', '') if nested_dir[0] == '/'
-
-        nested_dir
-      end
-
-      def at_path(page_repo_config)
-        path = page_repo_config['at_path'] || ""
-
-        Pathname(path)
-      end
-
-      def add_ancestors_of(page, ancestors)
-        if page
-          add_ancestors_of(page.parent, ancestors + [page])
-        else
-          ancestors
-        end
-      end
-
-      def make_breadcrumb(page, is_current_page)
-        return nil unless (text = page.data.breadcrumb || page.data.title)
-        if is_current_page
-          css_class = 'active'
-          link = content_tag :span, text
-        else
-          link = link_to(text, '/' + page.path)
-        end
-        content_tag :li, link, :class => css_class
-      end
-    end
-    
-    helpers HelperMethods
-    
-
-  end
-end
-::Middleman::Extensions.register(:bookbinder, Bookbinder::Helpers)

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7e3548..98bfed9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -181,11 +181,7 @@ public class CacheClientProxy implements ClientSession {
    * True if we are connected to a client.
    */
   private volatile boolean connected = false;
-  // /**
-  // * A string representing interest in all keys
-  // */
-  // protected static final String ALL_KEYS = "ALL_KEYS";
-  //
+
   /**
    * True if a marker message is still in the ha queue.
    */
@@ -459,47 +455,6 @@ public class CacheClientProxy implements ClientSession {
     return this.proxyID;
   }
 
-  // the following code was commented out simply because it was not used
-  // /**
-  // * Determines if the proxy represents the client host (and only the host, not
-  // * necessarily the exact VM running on the host)
-  // *
-  // * @return Whether the proxy represents the client host
-  // */
-  // protected boolean representsClientHost(String clientHost)
-  // {
-  // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
-  // return this._remoteHostAddress.equals(clientHost);
-  // }
-
-  // protected boolean representsClientVM(DistributedMember remoteMember)
-  // {
-  // // logger.warn("Is input port " + clientPort + " contained in " +
-  // // logger.warn("Does input host " + clientHost + " equal " +
-  // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
-  // // logger.warn("representsClientVM: " +
-  // // (representsClientHost(clientHost) && containsPort(clientPort)));
-  // return (proxyID.getDistributedMember().equals(remoteMember));
-  // }
-
-  // /**
-  // * Determines if the CacheClientUpdater proxied by this instance is listening
-  // * on the input clientHost and clientPort
-  // *
-  // * @param clientHost
-  // * The host name of the client to compare
-  // * @param clientPort
-  // * The port number of the client to compare
-  // *
-  // * @return Whether the CacheClientUpdater proxied by this instance is
-  // * listening on the input clientHost and clientPort
-  // */
-  // protected boolean representsCacheClientUpdater(String clientHost,
-  // int clientPort)
-  // {
-  // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
-  // }
-
   protected boolean isMember(ClientProxyMembershipID memberId) {
     return this.proxyID.equals(memberId);
   }
@@ -994,8 +949,7 @@ public class CacheClientProxy implements ClientSession {
   private void closeSocket() {
     if (this._socketClosed.compareAndSet(false, true)) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
-          null);
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
   }
@@ -1009,7 +963,6 @@ public class CacheClientProxy implements ClientSession {
     {
       String remoteHostAddress = this._remoteHostAddress;
       if (remoteHostAddress != null) {
-        this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
         this._remoteHostAddress = null;
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 6d86fd8..fbbe797 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -14,11 +14,15 @@
  */
 package org.apache.geode.internal.net;
 
+import org.apache.geode.SystemFailure;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
+import org.apache.logging.log4j.Logger;
+
 import java.io.IOException;
 import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -26,12 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingThreadGroup;
-
 /**
  * This class allows sockets to be closed without blocking. In some cases we have seen a
call of
  * socket.close block for minutes. This class maintains a thread pool for every other member
we have
@@ -51,28 +49,27 @@ public class SocketCloser {
    * minutes).
    */
   static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
+      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120);
   /**
    * Maximum number of threads that can be doing a socket close. Any close requests over
this max
    * will queue up waiting for a thread.
    */
-  static final int ASYNC_CLOSE_POOL_MAX_THREADS =
-      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+  private static final int ASYNC_CLOSE_POOL_MAX_THREADS =
+      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8);
   /**
    * How many milliseconds the synchronous requester waits for the async close to happen.
Default is
    * 0. Prior releases waited 50ms.
    */
-  static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
+  private static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
+      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0);
 
 
-  /** map of thread pools of async close threads */
-  private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
   private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
   private final long asyncCloseWaitTime;
   private final TimeUnit asyncCloseWaitUnits;
   private boolean closed;
+  private final ExecutorService socketCloserThreadPool;
 
   public SocketCloser() {
     this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -90,53 +87,25 @@ public class SocketCloser {
     this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
     this.asyncCloseWaitTime = asyncCloseWaitTime;
     this.asyncCloseWaitUnits = asyncCloseWaitUnits;
+
+    final ThreadGroup threadGroup =
+        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+    ThreadFactory threadFactory = command -> {
+      Thread thread = new Thread(threadGroup, command);
+      thread.setDaemon(true);
+      return thread;
+    };
+    socketCloserThreadPool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads,
+        this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), threadFactory);
   }
 
   public int getMaxThreads() {
     return this.asyncClosePoolMaxThreads;
   }
 
-  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool == null) {
-        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose",
logger);
-        ThreadFactory tf = new ThreadFactory() {
-          public Thread newThread(final Runnable command) {
-            Thread thread = new Thread(tg, command);
-            thread.setDaemon(true);
-            return thread;
-          }
-        };
-        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
-        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
-            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
-        pool.allowCoreThreadTimeOut(true);
-        asyncCloseExecutors.put(address, pool);
-      }
-      return pool;
-    }
-  }
-
-  /**
-   * Call this method if you know all the resources in the closer for the given address are
no
-   * longer needed. Currently a thread pool is kept for each address and if you know that
an address
-   * no longer needs its pool then you should call this method.
-   */
-  public void releaseResourcesForAddress(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool != null) {
-        pool.shutdown();
-        asyncCloseExecutors.remove(address);
-      }
-    }
-  }
-
   private boolean isClosed() {
-    synchronized (asyncCloseExecutors) {
-      return this.closed;
-    }
+    return this.closed;
   }
 
   /**
@@ -144,34 +113,9 @@ public class SocketCloser {
    * called then the asyncClose will be done synchronously.
    */
   public void close() {
-    synchronized (asyncCloseExecutors) {
-      if (!this.closed) {
-        this.closed = true;
-        for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
-          pool.shutdown();
-        }
-        asyncCloseExecutors.clear();
-      }
-    }
-  }
-
-  private void asyncExecute(String address, Runnable r) {
-    // Waiting 50ms for the async close request to complete is what the old (close per thread)
-    // code did. But now that we will not create a thread for every close request
-    // it seems better to let the thread that requested the close to move on quickly.
-    // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
-    // can be set to how many milliseconds to wait.
-    if (this.asyncCloseWaitTime == 0) {
-      getAsyncThreadExecutor(address).execute(r);
-    } else {
-      Future<?> future = getAsyncThreadExecutor(address).submit(r);
-      try {
-        future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
-      } catch (InterruptedException | ExecutionException | TimeoutException e) {
-        // We want this code to wait at most 50ms for the close to happen.
-        // It is ok to ignore these exception and let the close continue
-        // in the background.
-      }
+    if (!this.closed) {
+      this.closed = true;
+      socketCloserThreadPool.shutdown();
     }
   }
 
@@ -181,34 +125,40 @@ public class SocketCloser {
    * this method may block for a certain amount of time. If it is called after the SocketCloser
is
    * closed then a normal synchronous close is done.
    * 
-   * @param sock the socket to close
-   * @param address identifies who the socket is connected to
-   * @param extra an optional Runnable with stuff to execute in the async thread
+   * @param socket the socket to close
+   * @param runnableCode an optional Runnable with stuff to execute in the async thread
    */
-  public void asyncClose(final Socket sock, final String address, final Runnable extra) {
-    if (sock == null || sock.isClosed()) {
+  public void asyncClose(final Socket socket, final Runnable runnableCode) {
+    if (socket == null || socket.isClosed()) {
       return;
     }
+
     boolean doItInline = false;
     try {
-      synchronized (asyncCloseExecutors) {
-        if (isClosed()) {
-          // this SocketCloser has been closed so do a synchronous, inline, close
-          doItInline = true;
-        } else {
-          asyncExecute(address, new Runnable() {
-            public void run() {
-              Thread.currentThread().setName("AsyncSocketCloser for " + address);
-              try {
-                if (extra != null) {
-                  extra.run();
-                }
-                inlineClose(sock);
-              } finally {
-                Thread.currentThread().setName("unused AsyncSocketCloser");
+      if (isClosed()) {
+        // this SocketCloser has been closed so do a synchronous, inline, close
+        doItInline = true;
+      } else {
+        socketCloserThreadPool.execute(() -> {
+          if (runnableCode != null) {
+            runnableCode.run();
+          }
+          inlineClose(socket);
+        });
+        if (this.asyncCloseWaitTime != 0) {
+          try {
+            Future future = socketCloserThreadPool.submit(() -> {
+              if (runnableCode != null) {
+                runnableCode.run();
               }
-            }
-          });
+              inlineClose(socket);
+            });
+            future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+          } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            // We want this code to wait at most 50ms for the close to happen.
+            // It is ok to ignore these exception and let the close continue
+            // in the background.
+          }
         }
       }
     } catch (OutOfMemoryError ignore) {
@@ -217,10 +167,10 @@ public class SocketCloser {
       doItInline = true;
     }
     if (doItInline) {
-      if (extra != null) {
-        extra.run();
+      if (runnableCode != null) {
+        runnableCode.run();
       }
-      inlineClose(sock);
+      inlineClose(socket);
     }
   }
 
@@ -228,19 +178,19 @@ public class SocketCloser {
   /**
    * Closes the specified socket
    * 
-   * @param sock the socket to close
+   * @param socket the socket to close
    */
-  private static void inlineClose(final Socket sock) {
+  private void inlineClose(final Socket socket) {
     // the next two statements are a mad attempt to fix bug
     // 36041 - segv in jrockit in pthread signaling code. This
     // seems to alleviate the problem.
     try {
-      sock.shutdownInput();
-      sock.shutdownOutput();
+      socket.shutdownInput();
+      socket.shutdownOutput();
     } catch (Exception e) {
     }
     try {
-      sock.close();
+      socket.close();
     } catch (IOException ignore) {
     } catch (VirtualMachineError err) {
       SystemFailure.initiateFailure(err);

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 0ecb3bf..954a33c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -562,7 +562,7 @@ public class Connection implements Runnable {
       } catch (IOException io) {
         logger.fatal(LocalizedMessage
             .create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
-        t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), null);
+        t.getSocketCloser().asyncClose(socket, null);
         throw io;
       }
     }
@@ -847,7 +847,7 @@ public class Connection implements Runnable {
         Socket s = this.socket;
         if (s != null && !s.isClosed()) {
           prepareForAsyncClose();
-          this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
+          this.owner.getSocketCloser().asyncClose(s, null);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 044ab42..11c3bb3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -929,10 +929,6 @@ public class ConnectionTable {
               owner.getDM().getMembershipManager().getShutdownCause());
         }
       }
-
-      if (remoteAddress != null) {
-        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index 942cad4..b6dbfe2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -14,22 +14,21 @@
  */
 package org.apache.geode.internal.net;
 
-import static org.junit.Assert.*;
-
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.UnitTest;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Tests the default SocketCloser.
@@ -62,86 +61,49 @@ public class SocketCloserJUnitTest {
    */
   @Test
   public void testAsync() {
-    final CountDownLatch cdl = new CountDownLatch(1);
+    final CountDownLatch countDownLatch = new CountDownLatch(1);
     final AtomicInteger waitingToClose = new AtomicInteger(0);
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          waitingToClose.incrementAndGet();
-          cdl.await();
-        } catch (InterruptedException e) {
-        }
-      }
-    };
 
     final int SOCKET_COUNT = 100;
-    final Socket[] aSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      aSockets[i] = createClosableSocket();
-    }
-    // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(aSockets[i], "A", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, aSockets[i].isClosed());
-    }
-    final Socket[] bSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      bSockets[i] = createClosableSocket();
-    }
+    final int REMOTE_CLIENT_COUNT = 200;
+
+    List<Socket> trackedSockets = new ArrayList<>();
     // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(bSockets[i], "B", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, bSockets[i].isClosed());
+    // They should all be stuck on countDownLatch.
+    for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
+      Socket[] aSockets = new Socket[SOCKET_COUNT];
+
+      for (int j = 0; j < SOCKET_COUNT; j++) {
+        aSockets[j] = createClosableSocket();
+        trackedSockets.add(aSockets[j]);
+        this.socketCloser.asyncClose(aSockets[j], () -> {
+          try {
+            waitingToClose.incrementAndGet();
+            countDownLatch.await();
+          } catch (InterruptedException e) {
+          }
+        });
+      }
     }
+
     // close the socketCloser first to verify that the sockets
     // that have already been scheduled will be still be closed.
-    this.socketCloser.releaseResourcesForAddress("A");
     this.socketCloser.close();
-    // Each thread pool (one for A and one for B) has a max of 8 threads.
-    // So verify that this many are currently waiting on cdl.
-    {
-      final int maxThreads = this.socketCloser.getMaxThreads();
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          return waitingToClose.get() == 2 * maxThreads;
-        }
-
-        public String description() {
-          return "expected " + 2 * maxThreads + " waiters but found only " + waitingToClose.get();
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
-    // now count down the latch that allows the sockets to close
-    cdl.countDown();
+    countDownLatch.countDown();
     // now all the sockets should get closed; use a wait criteria
     // since a thread pool is doing to closes
-    {
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          for (int i = 0; i < SOCKET_COUNT; i++) {
-            if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
-              return false;
-            }
-          }
-          return true;
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+      boolean areAllClosed = true;
+      for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext();)
{
+        Socket socket = iterator.next();
+        if (socket.isClosed()) {
+          iterator.remove();
+          continue;
         }
-
-        public String description() {
-          return "one or more sockets did not close";
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
+        areAllClosed = false;
+      }
+      return areAllClosed;
+    });
   }
 
   /**
@@ -150,18 +112,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocket() throws Exception {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
     Socket s = createClosableSocket();
     s.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    Wait.pause(10);
-    assertEquals(false, runnableCalled.get());
+    this.socketCloser.asyncClose(s, () -> runnableCalled.set(true));
+    Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !runnableCalled.get());
   }
 
   /**
@@ -170,25 +125,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocketCloser() {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
-    final Socket s = createClosableSocket();
+    final Socket closableSocket = createClosableSocket();
     this.socketCloser.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        return runnableCalled.get() && s.isClosed();
-      }
-
-      public String description() {
-        return "runnable was not called or socket was not closed";
-      }
-    };
-    Wait.waitForCriterion(wc, 5000, 10, true);
+    this.socketCloser.asyncClose(closableSocket, () -> runnableCalled.set(true));
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        .until(() -> runnableCalled.get() && closableSocket.isClosed());
   }
 }


Mime
View raw message