incubator-ftpserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From n..@apache.org
Subject svn commit: r517289 - /incubator/ftpserver/trunk/core/src/java/org/apache/ftpserver/listener/mina/MinaListener.java
Date Mon, 12 Mar 2007 17:09:50 GMT
Author: ngn
Date: Mon Mar 12 10:09:50 2007
New Revision: 517289

URL: http://svn.apache.org/viewvc?view=rev&rev=517289
Log:
Tuned MINA listener to scale better (see http://mina.apache.org/configuring-thread-model.html).
Also allows for providing your own thread models if necessary.

Modified:
    incubator/ftpserver/trunk/core/src/java/org/apache/ftpserver/listener/mina/MinaListener.java

Modified: incubator/ftpserver/trunk/core/src/java/org/apache/ftpserver/listener/mina/MinaListener.java
URL: http://svn.apache.org/viewvc/incubator/ftpserver/trunk/core/src/java/org/apache/ftpserver/listener/mina/MinaListener.java?view=diff&rev=517289&r1=517288&r2=517289
==============================================================================
--- incubator/ftpserver/trunk/core/src/java/org/apache/ftpserver/listener/mina/MinaListener.java
(original)
+++ incubator/ftpserver/trunk/core/src/java/org/apache/ftpserver/listener/mina/MinaListener.java
Mon Mar 12 10:09:50 2007
@@ -27,16 +27,23 @@
 import org.apache.ftpserver.listener.AbstractListener;
 import org.apache.ftpserver.listener.FtpProtocolHandler;
 import org.apache.ftpserver.listener.Listener;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.ThreadModel;
 import org.apache.mina.filter.LoggingFilter;
 import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.transport.socket.nio.SocketAcceptor;
 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
 /**
  * The default {@link Listener} implementation.
  *
@@ -45,7 +52,7 @@
 
     private final Logger LOG = LoggerFactory.getLogger(MinaListener.class);
 
-    private IoAcceptor acceptor = new SocketAcceptor();
+    private IoAcceptor acceptor;
     
     private InetSocketAddress address;
     
@@ -55,11 +62,17 @@
     
     boolean suspended = false;
 
+    private int numberOfIoProcessingThread = Runtime.getRuntime().availableProcessors() +
1;
+    private ExecutorService ioProcessingExecutor = Executors.newCachedThreadPool();
+    private ExecutorService filterExecutor = Executors.newCachedThreadPool();
+
     /**
      * @see Listener#start(FtpServerContext)
      */
     public void start(FtpServerContext serverContext) throws Exception {
         
+        acceptor = new SocketAcceptor(numberOfIoProcessingThread, ioProcessingExecutor);
+        
         if(getServerAddress() != null) {
             address = new InetSocketAddress(getServerAddress(), getPort() );
         } else {
@@ -74,8 +87,13 @@
                 new ProtocolCodecFilter( new FtpServerProtocolCodecFactory() ) );
         cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
 
+        cfg.setThreadModel(ThreadModel.MANUAL);
+        
+        DefaultIoFilterChainBuilder filterChainBuilder = cfg.getFilterChain();
+        filterChainBuilder.addLast("threadPool", new ExecutorFilter(filterExecutor));
+        
         // Decrease the default receiver buffer size
-        ((SocketSessionConfig) acceptor.getDefaultConfig().getSessionConfig()).setReceiveBufferSize(512);

+        ((SocketSessionConfig) cfg.getSessionConfig()).setReceiveBufferSize(512); 
         
         if(isImplicitSsl()) {
             try {
@@ -88,7 +106,6 @@
             
         }
         
-        
         protocolHandler = new MinaFtpProtocolHandler(serverContext, new FtpProtocolHandler(serverContext),
this);
 
         acceptor.bind(address, protocolHandler, cfg );
@@ -103,6 +120,25 @@
             acceptor.unbindAll();
             acceptor = null;
         }
+        
+        if(ioProcessingExecutor != null) {
+            ioProcessingExecutor.shutdown();
+            try {
+                ioProcessingExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                // TODO: how to handle?
+            }
+        }
+        
+        if(filterExecutor != null) {
+            filterExecutor.shutdown();
+            try {
+                filterExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+            } finally {
+//              TODO: how to handle?
+            }
+        }
     }
 
     /**
@@ -140,5 +176,56 @@
         if(acceptor != null && !suspended) {
             acceptor.unbind(address);
         }
+    }
+
+    /**
+     * Get the {@link ExecutorService} used for processing requests. The default
+     * value is a cached thread pool.
+     * @return The {@link ExecutorService}
+     */
+    public ExecutorService getFilterExecutor() {
+        return filterExecutor;
+    }
+
+    /**
+     * Set the {@link ExecutorService} used for processing requests
+     * @param filterExecutor The {@link ExecutorService}
+     */
+    public void setFilterExecutor(ExecutorService filterExecutor) {
+        this.filterExecutor = filterExecutor;
+    }
+
+    /**
+     * Get the {@link ExecutorService} used for reading and writing to sockets. The default
+     * value is a cached thread pool.
+     * @return The {@link ExecutorService}
+     */
+    public ExecutorService getIoProcessingExecutor() {
+        return ioProcessingExecutor;
+    }
+
+    /**
+     * Set the {@link ExecutorService} used for reading and writing to sockets
+     * @param ioProcessingExecutor The {@link ExecutorService}
+     */
+    public void setIoProcessingExecutor(ExecutorService ioProcessingExecutor) {
+        this.ioProcessingExecutor = ioProcessingExecutor;
+    }
+
+    /**
+     * Get the number of threads used for IO processing. The default value is
+     * set to the number of available CPUs + 1
+     * @return The number of threads used for IO processing
+     */
+    public int getNumberOfIoProcessingThread() {
+        return numberOfIoProcessingThread;
+    }
+
+    /**
+     * Set the number of threads used for IO processing
+     * @param numberOfIoProcessingThread The number of threads used for IO processing.
+     */
+    public void setNumberOfIoProcessingThread(int numberOfIoProcessingThread) {
+        this.numberOfIoProcessingThread = numberOfIoProcessingThread;
     }
 }



Mime
View raw message