tomee-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andygumbre...@apache.org
Subject svn commit: r1232452 - /openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Date Tue, 17 Jan 2012 16:02:35 GMT
Author: andygumbrecht
Date: Tue Jan 17 16:02:35 2012
New Revision: 1232452

URL: http://svn.apache.org/viewvc?rev=1232452&view=rev
Log:
Fix cleanup.

Modified:
    openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java

Modified: openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=1232452&r1=1232451&r2=1232452&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
(original)
+++ openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Tue Jan 17 16:02:35 2012
@@ -32,7 +32,8 @@ import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -53,7 +54,8 @@ public class KeepAliveServer implements 
     private final long timeout = (1000 * 3);
 
     private final AtomicBoolean running = new AtomicBoolean(false);
-    private final KeepAliveTimer keepAliveTimer;
+    private final ConcurrentHashMap<Thread, Session> sessions = new ConcurrentHashMap<Thread,
Session>();
+    private BlockingQueue<Runnable> threadQueue;
     private Timer timer;
 
     public KeepAliveServer() {
@@ -62,105 +64,113 @@ public class KeepAliveServer implements 
 
     public KeepAliveServer(final ServerService service) {
         this.service = service;
-        this.keepAliveTimer = new KeepAliveTimer();
     }
 
-    public class KeepAliveTimer extends TimerTask {
-
-        // Doesn't need to be a map.  Could be a set if Session.equals/hashCode only referenced
the Thread.
-        private final Map<Thread, Session> sessions = new ConcurrentHashMap<Thread,
Session>();
-
-        private BlockingQueue<Runnable> queue;
-
-        @Override
-        public void run() {
-            if (running.get()) {
-                closeInactiveSessions();
-            }
-        }
+    private void closeInactiveSessions() {
 
-        private void closeInactiveSessions() {
-            final BlockingQueue<Runnable> queue = getQueue();
-            if (queue == null) return;
+        if (!this.running.get()) return;
 
-            int backlog = queue.size();
-            if (backlog <= 0) return;
-
-            final long now = System.currentTimeMillis();
+        final BlockingQueue<Runnable> queue = getQueue();
+        if (queue == null) return;
 
-            for (final Session session : sessions.values()) {
-
-                if (session.usage.tryLock()) {
-                    try {
-                        if (now - session.lastRequest > timeout) {
-                            try {
-                                backlog--;
-                                session.socket.close();
-                            } catch (IOException e) {
-                                if (logger.isWarningEnabled()) {
-                                    logger.warning("closeInactiveSessions: Error closing
socket. Debug for StackTrace");
-                                } else if (logger.isDebugEnabled()) {
-                                    logger.debug("closeInactiveSessions: Error closing socket.",
e);
-                                }
-                            } finally {
-                                removeSession(session);
+        int backlog = queue.size();
+        if (backlog <= 0) return;
+
+        final long now = System.currentTimeMillis();
+
+        final List<Session> current = new ArrayList<Session>();
+        current.addAll(this.sessions.values());
+
+        for (final Session session : current) {
+
+            if (session.usage.tryLock()) {
+                try {
+                    if (now - session.lastRequest > timeout) {
+                        try {
+                            backlog--;
+                            session.socket.close();
+                        } catch (IOException e) {
+                            if (logger.isWarningEnabled()) {
+                                logger.warning("closeInactiveSessions: Error closing socket.
Debug for StackTrace");
+                            } else if (logger.isDebugEnabled()) {
+                                logger.debug("closeInactiveSessions: Error closing socket.",
e);
                             }
+                        } finally {
+                            removeSession(session);
                         }
-                    } finally {
-                        session.usage.unlock();
                     }
+                } finally {
+                    session.usage.unlock();
                 }
-
-                if (backlog <= 0) return;
             }
+
+            if (backlog <= 0) return;
         }
+    }
 
-        public void closeSessions() {
+    public void closeSessions() {
 
-            // Close the ones we can
-            for (final Session session : sessions.values()) {
-                if (session.usage.tryLock()) {
-                    try {
-                        session.socket.close();
-                    } catch (IOException e) {
-                        if (logger.isWarningEnabled()) {
-                            logger.warning("closeSessions: Error closing socket. Debug for
StackTrace");
-                        } else if (logger.isDebugEnabled()) {
-                            logger.debug("closeSessions: Error closing socket.", e);
-                        }
-                    } finally {
-                        removeSession(session);
-                        session.usage.unlock();
+        // Close the ones we can
+        final List<Session> current = new ArrayList<Session>();
+        current.addAll(this.sessions.values());
+
+        for (final Session session : current) {
+            if (session.usage.tryLock()) {
+                try {
+                    session.socket.close();
+                } catch (IOException e) {
+                    if (logger.isWarningEnabled()) {
+                        logger.warning("closeSessions: Error closing socket. Debug for StackTrace");
+                    } else if (logger.isDebugEnabled()) {
+                        logger.debug("closeSessions: Error closing socket.", e);
                     }
-                } else if (logger.isDebugEnabled()) {
-                    logger.debug("Allowing graceful shutdown of " + session.socket.getInetAddress());
+                } finally {
+                    removeSession(session);
+                    session.usage.unlock();
                 }
+            } else if (logger.isDebugEnabled()) {
+                logger.debug("Allowing graceful shutdown of " + session.socket.getInetAddress());
             }
         }
+    }
 
-        private BlockingQueue<Runnable> getQueue() {
-            if (queue == null) {
-                // this can be null if timer fires before service is fully initialized
-                final ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class);
-                if (incoming == null) return null;
-                final ThreadPoolExecutor threadPool = incoming.getThreadPool();
-                queue = threadPool.getQueue();
-            }
-            return queue;
+    private BlockingQueue<Runnable> getQueue() {
+        if (this.threadQueue == null) {
+            // this can be null if timer fires before service is fully initialized
+            final ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class);
+            if (incoming == null) return null;
+            final ThreadPoolExecutor threadPool = incoming.getThreadPool();
+            this.threadQueue = threadPool.getQueue();
         }
+        return this.threadQueue;
+    }
+
+    public Session addSession(final Session session) {
+        return this.sessions.put(session.thread, session);
+    }
+
+    public Session removeSession(final Session session) {
+        return this.sessions.remove(session.thread);
+    }
+
+    public class KeepAliveTimer extends TimerTask {
+
+        private final KeepAliveServer kas;
 
-        public Session addSession(final Session session) {
-            return sessions.put(session.thread, session);
+        public KeepAliveTimer(final org.apache.openejb.server.ejbd.KeepAliveServer kas) {
+            this.kas = kas;
         }
 
-        public Session removeSession(final Session session) {
-            return sessions.remove(session.thread);
+        @Override
+        public void run() {
+            this.kas.closeInactiveSessions();
         }
     }
 
     private class Session {
 
         private final Thread thread;
+        private final KeepAliveServer kas;
         private final Lock usage = new ReentrantLock();
 
         // only used inside the Lock
@@ -169,14 +179,15 @@ public class KeepAliveServer implements 
         // only used inside the Lock
         private final Socket socket;
 
-        public Session(final Socket socket) {
+        public Session(final KeepAliveServer kas, final Socket socket) {
+            this.kas = kas;
             this.socket = socket;
             this.lastRequest = System.currentTimeMillis();
             this.thread = Thread.currentThread();
         }
 
         public void service(final Socket socket) throws ServiceException, IOException {
-            keepAliveTimer.addSession(this);
+            this.kas.addSession(this);
 
             int i = -1;
 
@@ -225,7 +236,7 @@ public class KeepAliveServer implements 
             } catch (InterruptedIOException e) {
                 Thread.interrupted();
             } finally {
-                keepAliveTimer.removeSession(this);
+                this.kas.removeSession(this);
             }
         }
     }
@@ -233,7 +244,7 @@ public class KeepAliveServer implements 
 
     @Override
     public void service(final Socket socket) throws ServiceException, IOException {
-        final Session session = new Session(socket);
+        final Session session = new Session(this, socket);
         session.service(socket);
     }
 
@@ -260,7 +271,7 @@ public class KeepAliveServer implements 
     public void start() throws ServiceException {
         if (!this.running.getAndSet(true)) {
             this.timer = new Timer("KeepAliveTimer", true);
-            this.timer.scheduleAtFixedRate(this.keepAliveTimer, this.timeout, (this.timeout
/ 2));
+            this.timer.scheduleAtFixedRate(new KeepAliveTimer(this), this.timeout, (this.timeout
/ 2));
         }
     }
 
@@ -268,7 +279,7 @@ public class KeepAliveServer implements 
     public void stop() throws ServiceException {
         if (this.running.getAndSet(false)) {
             try {
-                this.keepAliveTimer.closeSessions();
+                this.closeSessions();
             } catch (Throwable e) {
                 //Ignore
             }



Mime
View raw message