cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [1/5] git commit: updated refs/heads/master to a04b8f6
Date Fri, 11 Sep 2015 12:56:25 GMT
Repository: cloudstack
Updated Branches:
  refs/heads/master 5812f714f -> a04b8f6e8


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/SerialVersionUID.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/SerialVersionUID.java b/utils/src/main/java/com/cloud/utils/SerialVersionUID.java
index e4ea217..0683c8c 100644
--- a/utils/src/main/java/com/cloud/utils/SerialVersionUID.java
+++ b/utils/src/main/java/com/cloud/utils/SerialVersionUID.java
@@ -66,4 +66,6 @@ public interface SerialVersionUID {
     public static final long UnableDeleteHostException = Base | 0x29;
     public static final long AffinityConflictException = Base | 0x2a;
     public static final long JobCancellationException = Base | 0x2b;
+    public static final long NioConnectionException = Base | 0x2c;
+    public static final long TaskExecutionException = Base | 0x2d;
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java b/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java
new file mode 100644
index 0000000..e89b3d8
--- /dev/null
+++ b/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java
@@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.utils.exception;
+
+import com.cloud.utils.SerialVersionUID;
+
+/**
+ * Used by the NioConnection class to wrap-up its exceptions.
+ */
+public class NioConnectionException extends Exception {
+    private static final long serialVersionUID = SerialVersionUID.NioConnectionException;
+
+    protected int csErrorCode;
+
+    public NioConnectionException(final String msg, final Throwable cause) {
+        super(msg, cause);
+        setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName()));
+    }
+
+    public NioConnectionException(final String msg) {
+        super(msg);
+    }
+
+    public void setCSErrorCode(final int cserrcode) {
+        csErrorCode = cserrcode;
+    }
+
+    public int getCSErrorCode() {
+        return csErrorCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java b/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java
new file mode 100644
index 0000000..be639ba
--- /dev/null
+++ b/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java
@@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.utils.exception;
+
+import com.cloud.utils.SerialVersionUID;
+
+/**
+ * Used by the Task class to wrap-up its exceptions.
+ */
+public class TaskExecutionException extends Exception {
+    private static final long serialVersionUID = SerialVersionUID.NioConnectionException;
+
+    protected int csErrorCode;
+
+    public TaskExecutionException(final String msg, final Throwable cause) {
+        super(msg, cause);
+        setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName()));
+    }
+
+    public TaskExecutionException(final String msg) {
+        super(msg);
+    }
+
+    public void setCSErrorCode(final int cserrcode) {
+        csErrorCode = cserrcode;
+    }
+
+    public int getCSErrorCode() {
+        return csErrorCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/nio/NioClient.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java
index 2f742f9..d989f30 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java
@@ -29,9 +29,8 @@ import java.security.GeneralSecurityException;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.utils.security.SSLUtils;
+import org.apache.log4j.Logger;
 
 public class NioClient extends NioConnection {
     private static final Logger s_logger = Logger.getLogger(NioClient.class);
@@ -40,12 +39,12 @@ public class NioClient extends NioConnection {
     protected String _bindAddress;
     protected SocketChannel _clientConnection;
 
-    public NioClient(String name, String host, int port, int workers, HandlerFactory factory)
{
+    public NioClient(final String name, final String host, final int port, final int workers,
final HandlerFactory factory) {
         super(name, port, workers, factory);
         _host = host;
     }
 
-    public void setBindAddress(String ipAddress) {
+    public void setBindAddress(final String ipAddress) {
         _bindAddress = ipAddress;
     }
 
@@ -62,18 +61,18 @@ public class NioClient extends NioConnection {
             if (_bindAddress != null) {
                 s_logger.info("Binding outbound interface at " + _bindAddress);
 
-                InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0);
+                final InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0);
                 _clientConnection.socket().bind(bindAddr);
             }
 
-            InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
+            final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
             _clientConnection.connect(peerAddr);
 
             SSLEngine sslEngine = null;
             // Begin SSL handshake in BLOCKING mode
             _clientConnection.configureBlocking(true);
 
-            SSLContext sslContext = Link.initSSLContext(true);
+            final SSLContext sslContext = Link.initSSLContext(true);
             sslEngine = sslContext.createSSLEngine(_host, _port);
             sslEngine.setUseClientMode(true);
             sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
@@ -83,32 +82,31 @@ public class NioClient extends NioConnection {
             s_logger.info("Connected to " + _host + ":" + _port);
 
             _clientConnection.configureBlocking(false);
-            Link link = new Link(peerAddr, this);
+            final Link link = new Link(peerAddr, this);
             link.setSSLEngine(sslEngine);
-            SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ);
+            final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ);
             link.setKey(key);
             key.attach(link);
             // Notice we've already connected due to the handshake, so let's get the
             // remaining task done
             task = _factory.create(Task.Type.CONNECT, link, null);
-        } catch (GeneralSecurityException e) {
+        } catch (final GeneralSecurityException e) {
             _selector.close();
             throw new IOException("Failed to initialise security", e);
-        } catch (IOException e) {
+        } catch (final IOException e) {
             _selector.close();
             throw e;
         }
-
-        _executor.execute(task);
+        _executor.submit(task);
     }
 
     @Override
-    protected void registerLink(InetSocketAddress saddr, Link link) {
+    protected void registerLink(final InetSocketAddress saddr, final Link link) {
         // don't do anything.
     }
 
     @Override
-    protected void unregisterLink(InetSocketAddress saddr) {
+    protected void unregisterLink(final InetSocketAddress saddr) {
         // don't do anything.
     }
 
@@ -119,7 +117,5 @@ public class NioClient extends NioConnection {
             _clientConnection.close();
         }
         s_logger.info("NioClient connection closed");
-
     }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
index 4c66360..ddc84cf 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
@@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
@@ -35,7 +36,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -43,21 +47,23 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.utils.security.SSLUtils;
+import org.apache.log4j.Logger;
 
 import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.exception.NioConnectionException;
 
 /**
  * NioConnection abstracts the NIO socket operations.  The Java implementation
  * provides that.
  */
-public abstract class NioConnection implements Runnable {
+public abstract class NioConnection implements Callable<Boolean> {
     private static final Logger s_logger = Logger.getLogger(NioConnection.class);;
 
     protected Selector _selector;
-    protected Thread _thread;
+    protected ExecutorService _threadExecutor;
+    protected Future<Boolean> _futureTask;
+
     protected boolean _isRunning;
     protected boolean _isStartup;
     protected int _port;
@@ -66,42 +72,48 @@ public abstract class NioConnection implements Runnable {
     protected String _name;
     protected ExecutorService _executor;
 
-    public NioConnection(String name, int port, int workers, HandlerFactory factory) {
+    public NioConnection(final String name, final int port, final int workers, final HandlerFactory
factory) {
         _name = name;
         _isRunning = false;
-        _thread = null;
         _selector = null;
         _port = port;
         _factory = factory;
         _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(name + "-Handler"));
     }
 
-    public void start() {
+    public void start() throws NioConnectionException {
         _todos = new ArrayList<ChangeRequest>();
 
-        _thread = new Thread(this, _name + "-Selector");
-        _isRunning = true;
-        _thread.start();
-        // Wait until we got init() done
-        synchronized (_thread) {
-            try {
-                _thread.wait();
-            } catch (InterruptedException e) {
-                s_logger.warn("Interrupted start thread ", e);
-            }
+        try {
+            init();
+        } catch (final ConnectException e) {
+            s_logger.warn("Unable to connect to remote: is there a server running on port
" + _port);
+        } catch (final IOException e) {
+            s_logger.error("Unable to initialize the threads.", e);
+            throw new NioConnectionException(e.getMessage(), e);
+        } catch (final Exception e) {
+            s_logger.error("Unable to initialize the threads due to unknown exception.",
e);
+            throw new NioConnectionException(e.getMessage(), e);
         }
+        _isStartup = true;
+
+        _threadExecutor = Executors.newSingleThreadExecutor();
+        _futureTask = _threadExecutor.submit(this);
+
+        _isRunning = true;
     }
 
     public void stop() {
         _executor.shutdown();
         _isRunning = false;
-        if (_thread != null) {
-            _thread.interrupt();
+        if (_threadExecutor != null) {
+            _futureTask.cancel(false);
+            _threadExecutor.shutdown();
         }
     }
 
     public boolean isRunning() {
-        return _thread.isAlive();
+        return !_futureTask.isDone();
     }
 
     public boolean isStartup() {
@@ -109,45 +121,28 @@ public abstract class NioConnection implements Runnable {
     }
 
     @Override
-    public void run() {
-        synchronized (_thread) {
-            try {
-                init();
-            } catch (ConnectException e) {
-                s_logger.warn("Unable to connect to remote: is there a server running on
port " + _port);
-                return;
-            } catch (IOException e) {
-                s_logger.error("Unable to initialize the threads.", e);
-                return;
-            } catch (Exception e) {
-                s_logger.error("Unable to initialize the threads due to unknown exception.",
e);
-                return;
-            }
-            _isStartup = true;
-            _thread.notifyAll();
-        }
-
+    public Boolean call() throws NioConnectionException {
         while (_isRunning) {
             try {
                 _selector.select();
 
                 // Someone is ready for I/O, get the ready keys
-                Set<SelectionKey> readyKeys = _selector.selectedKeys();
-                Iterator<SelectionKey> i = readyKeys.iterator();
+                final Set<SelectionKey> readyKeys = _selector.selectedKeys();
+                final Iterator<SelectionKey> i = readyKeys.iterator();
 
                 if (s_logger.isTraceEnabled()) {
                     s_logger.trace("Keys Processing: " + readyKeys.size());
                 }
                 // Walk through the ready keys collection.
                 while (i.hasNext()) {
-                    SelectionKey sk = i.next();
+                    final SelectionKey sk = i.next();
                     i.remove();
 
                     if (!sk.isValid()) {
                         if (s_logger.isTraceEnabled()) {
                             s_logger.trace("Selection Key is invalid: " + sk.toString());
                         }
-                        Link link = (Link)sk.attachment();
+                        final Link link = (Link)sk.attachment();
                         if (link != null) {
                             link.terminated();
                         } else {
@@ -167,13 +162,18 @@ public abstract class NioConnection implements Runnable {
                 s_logger.trace("Keys Done Processing.");
 
                 processTodos();
-            } catch (Throwable e) {
-                s_logger.warn("Caught an exception but continuing on.", e);
+            } catch (final ClosedSelectorException e) {
+                /*
+                 * Exception occurred when calling java.nio.channels.Selector.selectedKeys()
method. It means the connection has not yet been established. Let's continue trying
+                 * We do not log it here otherwise we will fill the disk with messages.
+                 */
+            } catch (final IOException e) {
+                s_logger.error("Agent will die due to this IOException!", e);
+                throw new NioConnectionException(e.getMessage(), e);
             }
         }
-        synchronized (_thread) {
-            _isStartup = false;
-        }
+        _isStartup = false;
+        return true;
     }
 
     abstract void init() throws IOException;
@@ -182,11 +182,11 @@ public abstract class NioConnection implements Runnable {
 
     abstract void unregisterLink(InetSocketAddress saddr);
 
-    protected void accept(SelectionKey key) throws IOException {
-        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
+    protected void accept(final SelectionKey key) throws IOException {
+        final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
 
-        SocketChannel socketChannel = serverSocketChannel.accept();
-        Socket socket = socketChannel.socket();
+        final SocketChannel socketChannel = serverSocketChannel.accept();
+        final Socket socket = socketChannel.socket();
         socket.setKeepAlive(true);
 
         if (s_logger.isTraceEnabled()) {
@@ -198,7 +198,7 @@ public abstract class NioConnection implements Runnable {
 
         SSLEngine sslEngine = null;
         try {
-            SSLContext sslContext = Link.initSSLContext(false);
+            final SSLContext sslContext = Link.initSSLContext(false);
             sslEngine = sslContext.createSSLEngine();
             sslEngine.setUseClientMode(false);
             sslEngine.setNeedClientAuth(false);
@@ -206,7 +206,7 @@ public abstract class NioConnection implements Runnable {
 
             Link.doHandshake(socketChannel, sslEngine, false);
 
-        } catch (Exception e) {
+        } catch (final Exception e) {
             if (s_logger.isTraceEnabled()) {
                 s_logger.trace("Socket " + socket + " closed on read.  Probably -1 returned:
" + e.getMessage());
             }
@@ -219,53 +219,68 @@ public abstract class NioConnection implements Runnable {
             s_logger.trace("SSL: Handshake done");
         }
         socketChannel.configureBlocking(false);
-        InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
-        Link link = new Link(saddr, this);
+        final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
+        final Link link = new Link(saddr, this);
         link.setSSLEngine(sslEngine);
         link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
-        Task task = _factory.create(Task.Type.CONNECT, link, null);
+        final Task task = _factory.create(Task.Type.CONNECT, link, null);
         registerLink(saddr, link);
-        _executor.execute(task);
+
+        try {
+            _executor.submit(task);
+        } catch (final Exception e) {
+            s_logger.warn("Exception occurred when submitting the task", e);
+        }
     }
 
-    protected void terminate(SelectionKey key) {
-        Link link = (Link)key.attachment();
+    protected void terminate(final SelectionKey key) {
+        final Link link = (Link)key.attachment();
         closeConnection(key);
         if (link != null) {
             link.terminated();
-            Task task = _factory.create(Task.Type.DISCONNECT, link, null);
+            final Task task = _factory.create(Task.Type.DISCONNECT, link, null);
             unregisterLink(link.getSocketAddress());
-            _executor.execute(task);
+
+            try {
+                _executor.submit(task);
+            } catch (final Exception e) {
+                s_logger.warn("Exception occurred when submitting the task", e);
+            }
         }
     }
 
-    protected void read(SelectionKey key) throws IOException {
-        Link link = (Link)key.attachment();
+    protected void read(final SelectionKey key) throws IOException {
+        final Link link = (Link)key.attachment();
         try {
-            SocketChannel socketChannel = (SocketChannel)key.channel();
+            final SocketChannel socketChannel = (SocketChannel)key.channel();
             if (s_logger.isTraceEnabled()) {
                 s_logger.trace("Reading from: " + socketChannel.socket().toString());
             }
-            byte[] data = link.read(socketChannel);
+            final byte[] data = link.read(socketChannel);
             if (data == null) {
                 if (s_logger.isTraceEnabled()) {
                     s_logger.trace("Packet is incomplete.  Waiting for more.");
                 }
                 return;
             }
-            Task task = _factory.create(Task.Type.DATA, link, data);
-            _executor.execute(task);
-        } catch (Exception e) {
+            final Task task = _factory.create(Task.Type.DATA, link, data);
+
+            try {
+                _executor.submit(task);
+            } catch (final Exception e) {
+                s_logger.warn("Exception occurred when submitting the task", e);
+            }
+        } catch (final Exception e) {
             logDebug(e, key, 1);
             terminate(key);
         }
     }
 
-    protected void logTrace(Exception e, SelectionKey key, int loc) {
+    protected void logTrace(final Exception e, final SelectionKey key, final int loc) {
         if (s_logger.isTraceEnabled()) {
             Socket socket = null;
             if (key != null) {
-                SocketChannel ch = (SocketChannel)key.channel();
+                final SocketChannel ch = (SocketChannel)key.channel();
                 if (ch != null) {
                     socket = ch.socket();
                 }
@@ -275,11 +290,11 @@ public abstract class NioConnection implements Runnable {
         }
     }
 
-    protected void logDebug(Exception e, SelectionKey key, int loc) {
+    protected void logDebug(final Exception e, final SelectionKey key, final int loc) {
         if (s_logger.isDebugEnabled()) {
             Socket socket = null;
             if (key != null) {
-                SocketChannel ch = (SocketChannel)key.channel();
+                final SocketChannel ch = (SocketChannel)key.channel();
                 if (ch != null) {
                     socket = ch.socket();
                 }
@@ -304,113 +319,122 @@ public abstract class NioConnection implements Runnable {
             s_logger.trace("Todos Processing: " + todos.size());
         }
         SelectionKey key;
-        for (ChangeRequest todo : todos) {
+        for (final ChangeRequest todo : todos) {
             switch (todo.type) {
-                case ChangeRequest.CHANGEOPS:
-                    try {
-                        key = (SelectionKey)todo.key;
-                        if (key != null && key.isValid()) {
-                            if (todo.att != null) {
-                                key.attach(todo.att);
-                                Link link = (Link)todo.att;
-                                link.setKey(key);
-                            }
-                            key.interestOps(todo.ops);
-                        }
-                    } catch (CancelledKeyException e) {
-                        s_logger.debug("key has been cancelled");
-                    }
-                    break;
-                case ChangeRequest.REGISTER:
-                    try {
-                        key = ((SocketChannel)(todo.key)).register(_selector, todo.ops, todo.att);
+            case ChangeRequest.CHANGEOPS:
+                try {
+                    key = (SelectionKey)todo.key;
+                    if (key != null && key.isValid()) {
                         if (todo.att != null) {
-                            Link link = (Link)todo.att;
+                            key.attach(todo.att);
+                            final Link link = (Link)todo.att;
                             link.setKey(key);
                         }
-                    } catch (ClosedChannelException e) {
-                        s_logger.warn("Couldn't register socket: " + todo.key);
-                        try {
-                            ((SocketChannel)(todo.key)).close();
-                        } catch (IOException ignore) {
-                            s_logger.info("[ignored] socket channel");
-                        } finally {
-                            Link link = (Link)todo.att;
-                            link.terminated();
-                        }
+                        key.interestOps(todo.ops);
                     }
-                    break;
-                case ChangeRequest.CLOSE:
-                    if (s_logger.isTraceEnabled()) {
-                        s_logger.trace("Trying to close " + todo.key);
+                } catch (final CancelledKeyException e) {
+                    s_logger.debug("key has been cancelled");
+                }
+                break;
+            case ChangeRequest.REGISTER:
+                try {
+                    key = ((SocketChannel)todo.key).register(_selector, todo.ops, todo.att);
+                    if (todo.att != null) {
+                        final Link link = (Link)todo.att;
+                        link.setKey(key);
                     }
-                    key = (SelectionKey)todo.key;
-                    closeConnection(key);
-                    if (key != null) {
-                        Link link = (Link)key.attachment();
-                        if (link != null) {
-                            link.terminated();
-                        }
+                } catch (final ClosedChannelException e) {
+                    s_logger.warn("Couldn't register socket: " + todo.key);
+                    try {
+                        ((SocketChannel)todo.key).close();
+                    } catch (final IOException ignore) {
+                        s_logger.info("[ignored] socket channel");
+                    } finally {
+                        final Link link = (Link)todo.att;
+                        link.terminated();
+                    }
+                }
+                break;
+            case ChangeRequest.CLOSE:
+                if (s_logger.isTraceEnabled()) {
+                    s_logger.trace("Trying to close " + todo.key);
+                }
+                key = (SelectionKey)todo.key;
+                closeConnection(key);
+                if (key != null) {
+                    final Link link = (Link)key.attachment();
+                    if (link != null) {
+                        link.terminated();
                     }
-                    break;
-                default:
-                    s_logger.warn("Shouldn't be here");
-                    throw new RuntimeException("Shouldn't be here");
+                }
+                break;
+            default:
+                s_logger.warn("Shouldn't be here");
+                throw new RuntimeException("Shouldn't be here");
             }
         }
         s_logger.trace("Todos Done processing");
     }
 
-    protected void connect(SelectionKey key) throws IOException {
-        SocketChannel socketChannel = (SocketChannel)key.channel();
+    protected void connect(final SelectionKey key) throws IOException {
+        final SocketChannel socketChannel = (SocketChannel)key.channel();
 
         try {
             socketChannel.finishConnect();
             key.interestOps(SelectionKey.OP_READ);
-            Socket socket = socketChannel.socket();
+            final Socket socket = socketChannel.socket();
             if (!socket.getKeepAlive()) {
                 socket.setKeepAlive(true);
             }
             if (s_logger.isDebugEnabled()) {
                 s_logger.debug("Connected to " + socket);
             }
-            Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this);
+            final Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(),
this);
             link.setKey(key);
             key.attach(link);
-            Task task = _factory.create(Task.Type.CONNECT, link, null);
-            _executor.execute(task);
-        } catch (IOException e) {
+            final Task task = _factory.create(Task.Type.CONNECT, link, null);
+
+            try {
+                _executor.submit(task);
+            } catch (final Exception e) {
+                s_logger.warn("Exception occurred when submitting the task", e);
+            }
+        } catch (final IOException e) {
             logTrace(e, key, 2);
             terminate(key);
         }
     }
 
-    protected void scheduleTask(Task task) {
-        _executor.execute(task);
+    protected void scheduleTask(final Task task) {
+        try {
+            _executor.submit(task);
+        } catch (final Exception e) {
+            s_logger.warn("Exception occurred when submitting the task", e);
+        }
     }
 
-    protected void write(SelectionKey key) throws IOException {
-        Link link = (Link)key.attachment();
+    protected void write(final SelectionKey key) throws IOException {
+        final Link link = (Link)key.attachment();
         try {
             if (s_logger.isTraceEnabled()) {
                 s_logger.trace("Writing to " + link.getSocketAddress().toString());
             }
-            boolean close = link.write((SocketChannel)key.channel());
+            final boolean close = link.write((SocketChannel)key.channel());
             if (close) {
                 closeConnection(key);
                 link.terminated();
             } else {
                 key.interestOps(SelectionKey.OP_READ);
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             logDebug(e, key, 3);
             terminate(key);
         }
     }
 
-    protected void closeConnection(SelectionKey key) {
+    protected void closeConnection(final SelectionKey key) {
         if (key != null) {
-            SocketChannel channel = (SocketChannel)key.channel();
+            final SocketChannel channel = (SocketChannel)key.channel();
             key.cancel();
             try {
                 if (channel != null) {
@@ -419,30 +443,30 @@ public abstract class NioConnection implements Runnable {
                     }
                     channel.close();
                 }
-            } catch (IOException ignore) {
+            } catch (final IOException ignore) {
                 s_logger.info("[ignored] channel");
             }
         }
     }
 
-    public void register(int ops, SocketChannel key, Object att) {
-        ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att);
+    public void register(final int ops, final SocketChannel key, final Object att) {
+        final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att);
         synchronized (this) {
             _todos.add(todo);
         }
         _selector.wakeup();
     }
 
-    public void change(int ops, SelectionKey key, Object att) {
-        ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att);
+    public void change(final int ops, final SelectionKey key, final Object att) {
+        final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att);
         synchronized (this) {
             _todos.add(todo);
         }
         _selector.wakeup();
     }
 
-    public void close(SelectionKey key) {
-        ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null);
+    public void close(final SelectionKey key) {
+        final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null);
         synchronized (this) {
             _todos.add(todo);
         }
@@ -466,7 +490,7 @@ public abstract class NioConnection implements Runnable {
         public int ops;
         public Object att;
 
-        public ChangeRequest(Object key, int type, int ops, Object att) {
+        public ChangeRequest(final Object key, final int type, final int ops, final Object
att) {
             this.key = key;
             this.type = type;
             this.ops = ops;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/nio/NioServer.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java
index 98a4a51..539c2bb 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java
@@ -37,7 +37,7 @@ public class NioServer extends NioConnection {
 
     protected WeakHashMap<InetSocketAddress, Link> _links;
 
-    public NioServer(String name, int port, int workers, HandlerFactory factory) {
+    public NioServer(final String name, final int port, final int workers, final HandlerFactory
factory) {
         super(name, port, workers, factory);
         _localAddr = null;
         _links = new WeakHashMap<InetSocketAddress, Link>(1024);
@@ -68,12 +68,12 @@ public class NioServer extends NioConnection {
     }
 
     @Override
-    protected void registerLink(InetSocketAddress addr, Link link) {
+    protected void registerLink(final InetSocketAddress addr, final Link link) {
         _links.put(addr, link);
     }
 
     @Override
-    protected void unregisterLink(InetSocketAddress saddr) {
+    protected void unregisterLink(final InetSocketAddress saddr) {
         _links.remove(saddr);
     }
 
@@ -86,8 +86,8 @@ public class NioServer extends NioConnection {
      * @param data
      * @return null if not sent.  attach object in link if sent.
      */
-    public Object send(InetSocketAddress saddr, byte[] data) throws ClosedChannelException
{
-        Link link = _links.get(saddr);
+    public Object send(final InetSocketAddress saddr, final byte[] data) throws ClosedChannelException
{
+        final Link link = _links.get(saddr);
         if (link == null) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/nio/Task.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/Task.java b/utils/src/main/java/com/cloud/utils/nio/Task.java
index c77c703..60228ef 100644
--- a/utils/src/main/java/com/cloud/utils/nio/Task.java
+++ b/utils/src/main/java/com/cloud/utils/nio/Task.java
@@ -19,14 +19,14 @@
 
 package com.cloud.utils.nio;
 
-import org.apache.log4j.Logger;
+import java.util.concurrent.Callable;
+
+import com.cloud.utils.exception.TaskExecutionException;
 
 /**
  * Task represents one todo item for the AgentManager or the AgentManager
- *
  */
-public abstract class Task implements Runnable {
-    private static final Logger s_logger = Logger.getLogger(Task.class);
+public abstract class Task implements Callable<Boolean> {
 
     public enum Type {
         CONNECT,     // Process a new connection.
@@ -40,13 +40,13 @@ public abstract class Task implements Runnable {
     Type _type;
     Link _link;
 
-    public Task(Type type, Link link, byte[] data) {
+    public Task(final Type type, final Link link, final byte[] data) {
         _data = data;
         _type = type;
         _link = link;
     }
 
-    public Task(Type type, Link link, Object data) {
+    public Task(final Type type, final Link link, final Object data) {
         _data = data;
         _type = type;
         _link = link;
@@ -76,14 +76,11 @@ public abstract class Task implements Runnable {
         return _type.toString();
     }
 
-    abstract protected void doTask(Task task) throws Exception;
+    abstract protected void doTask(Task task) throws TaskExecutionException;
 
     @Override
-    public final void run() {
-        try {
-            doTask(this);
-        } catch (Throwable e) {
-            s_logger.warn("Caught the following exception but pushing on", e);
-        }
+    public Boolean call() throws TaskExecutionException {
+        doTask(this);
+        return true;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
----------------------------------------------------------------------
diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
index fc16684..d8510cf 100644
--- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
+++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
@@ -27,6 +27,7 @@ import junit.framework.TestCase;
 import org.apache.log4j.Logger;
 import org.junit.Assert;
 
+import com.cloud.utils.exception.NioConnectionException;
 import com.cloud.utils.nio.HandlerFactory;
 import com.cloud.utils.nio.Link;
 import com.cloud.utils.nio.NioClient;
@@ -56,7 +57,7 @@ public class NioTest extends TestCase {
     private boolean isTestsDone() {
         boolean result;
         synchronized (this) {
-            result = (_testCount == _completedCount);
+            result = _testCount == _completedCount;
         }
         return result;
     }
@@ -81,16 +82,24 @@ public class NioTest extends TestCase {
         _completedCount = 0;
 
         _server = new NioServer("NioTestServer", 7777, 5, new NioTestServer());
-        _server.start();
+        try {
+            _server.start();
+        } catch (final NioConnectionException e) {
+            fail(e.getMessage());
+        }
 
         _client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient());
-        _client.start();
+        try {
+            _client.start();
+        } catch (final NioConnectionException e) {
+            fail(e.getMessage());
+        }
 
         while (_clientLink == null) {
             try {
                 s_logger.debug("Link is not up! Waiting ...");
                 Thread.sleep(1000);
-            } catch (InterruptedException e) {
+            } catch (final InterruptedException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
             }
@@ -101,9 +110,9 @@ public class NioTest extends TestCase {
     public void tearDown() {
         while (!isTestsDone()) {
             try {
-                s_logger.debug(this._completedCount + "/" + this._testCount + " tests done.
Waiting for completion");
+                s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting
for completion");
                 Thread.sleep(1000);
-            } catch (InterruptedException e) {
+            } catch (final InterruptedException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
             }
@@ -122,7 +131,7 @@ public class NioTest extends TestCase {
         s_logger.info("Server stopped.");
     }
 
-    protected void setClientLink(Link link) {
+    protected void setClientLink(final Link link) {
         _clientLink = link;
     }
 
@@ -140,13 +149,13 @@ public class NioTest extends TestCase {
             getOneMoreTest();
             _clientLink.send(_testBytes);
             s_logger.info("Client: Data sent");
-        } catch (ClosedChannelException e) {
+        } catch (final ClosedChannelException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
     }
 
-    protected void doServerProcess(byte[] data) {
+    protected void doServerProcess(final byte[] data) {
         oneMoreTestDone();
         Assert.assertArrayEquals(_testBytes, data);
         s_logger.info("Verify done.");
@@ -155,13 +164,13 @@ public class NioTest extends TestCase {
     public class NioTestClient implements HandlerFactory {
 
         @Override
-        public Task create(Type type, Link link, byte[] data) {
+        public Task create(final Type type, final Link link, final byte[] data) {
             return new NioTestClientHandler(type, link, data);
         }
 
         public class NioTestClientHandler extends Task {
 
-            public NioTestClientHandler(Type type, Link link, byte[] data) {
+            public NioTestClientHandler(final Type type, final Link link, final byte[] data)
{
                 super(type, link, data);
             }
 
@@ -186,13 +195,13 @@ public class NioTest extends TestCase {
     public class NioTestServer implements HandlerFactory {
 
         @Override
-        public Task create(Type type, Link link, byte[] data) {
+        public Task create(final Type type, final Link link, final byte[] data) {
             return new NioTestServerHandler(type, link, data);
         }
 
         public class NioTestServerHandler extends Task {
 
-            public NioTestServerHandler(Type type, Link link, byte[] data) {
+            public NioTestServerHandler(final Type type, final Link link, final byte[] data)
{
                 super(type, link, data);
             }
 


Mime
View raw message