Return-Path: X-Original-To: apmail-cloudstack-commits-archive@www.apache.org Delivered-To: apmail-cloudstack-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2569717D35 for ; Fri, 11 Sep 2015 12:56:26 +0000 (UTC) Received: (qmail 34704 invoked by uid 500); 11 Sep 2015 12:56:26 -0000 Delivered-To: apmail-cloudstack-commits-archive@cloudstack.apache.org Received: (qmail 34656 invoked by uid 500); 11 Sep 2015 12:56:25 -0000 Mailing-List: contact commits-help@cloudstack.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cloudstack.apache.org Delivered-To: mailing list commits@cloudstack.apache.org Received: (qmail 34636 invoked by uid 99); 11 Sep 2015 12:56:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Sep 2015 12:56:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0762DFFBF; Fri, 11 Sep 2015 12:56:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ekho@apache.org To: commits@cloudstack.apache.org Date: Fri, 11 Sep 2015 12:56:25 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] git commit: updated refs/heads/master to a04b8f6 Repository: cloudstack Updated Branches: refs/heads/master 5812f714f -> a04b8f6e8 http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/SerialVersionUID.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/SerialVersionUID.java b/utils/src/main/java/com/cloud/utils/SerialVersionUID.java index e4ea217..0683c8c 100644 --- a/utils/src/main/java/com/cloud/utils/SerialVersionUID.java +++ b/utils/src/main/java/com/cloud/utils/SerialVersionUID.java @@ -66,4 +66,6 @@ public interface SerialVersionUID { public static final long UnableDeleteHostException = Base | 0x29; public static final long AffinityConflictException = Base | 0x2a; public static final long JobCancellationException = Base | 0x2b; + public static final long NioConnectionException = Base | 0x2c; + public static final long TaskExecutionException = Base | 0x2d; } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java b/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java new file mode 100644 index 0000000..e89b3d8 --- /dev/null +++ b/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java @@ -0,0 +1,48 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package com.cloud.utils.exception; + +import com.cloud.utils.SerialVersionUID; + +/** + * Used by the NioConnection class to wrap-up its exceptions. + */ +public class NioConnectionException extends Exception { + private static final long serialVersionUID = SerialVersionUID.NioConnectionException; + + protected int csErrorCode; + + public NioConnectionException(final String msg, final Throwable cause) { + super(msg, cause); + setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName())); + } + + public NioConnectionException(final String msg) { + super(msg); + } + + public void setCSErrorCode(final int cserrcode) { + csErrorCode = cserrcode; + } + + public int getCSErrorCode() { + return csErrorCode; + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java b/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java new file mode 100644 index 0000000..be639ba --- /dev/null +++ b/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java @@ -0,0 +1,48 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package com.cloud.utils.exception; + +import com.cloud.utils.SerialVersionUID; + +/** + * Used by the Task class to wrap-up its exceptions. + */ +public class TaskExecutionException extends Exception { + private static final long serialVersionUID = SerialVersionUID.NioConnectionException; + + protected int csErrorCode; + + public TaskExecutionException(final String msg, final Throwable cause) { + super(msg, cause); + setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName())); + } + + public TaskExecutionException(final String msg) { + super(msg); + } + + public void setCSErrorCode(final int cserrcode) { + csErrorCode = cserrcode; + } + + public int getCSErrorCode() { + return csErrorCode; + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/nio/NioClient.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java index 2f742f9..d989f30 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java @@ -29,9 +29,8 @@ import java.security.GeneralSecurityException; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.log4j.Logger; - import org.apache.cloudstack.utils.security.SSLUtils; +import org.apache.log4j.Logger; public class NioClient extends NioConnection { private static final Logger s_logger = Logger.getLogger(NioClient.class); @@ -40,12 +39,12 @@ public class NioClient extends NioConnection { protected String _bindAddress; protected SocketChannel _clientConnection; - public NioClient(String name, String host, int port, int workers, HandlerFactory factory) { + public NioClient(final String name, final String host, final int port, final int workers, final HandlerFactory factory) { super(name, port, workers, factory); _host = host; } - public void setBindAddress(String ipAddress) { + public void setBindAddress(final String ipAddress) { _bindAddress = ipAddress; } @@ -62,18 +61,18 @@ public class NioClient extends NioConnection { if (_bindAddress != null) { s_logger.info("Binding outbound interface at " + _bindAddress); - InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0); + final InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0); _clientConnection.socket().bind(bindAddr); } - InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); + final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); _clientConnection.connect(peerAddr); SSLEngine sslEngine = null; // Begin SSL handshake in BLOCKING mode _clientConnection.configureBlocking(true); - SSLContext sslContext = Link.initSSLContext(true); + final SSLContext sslContext = Link.initSSLContext(true); sslEngine = sslContext.createSSLEngine(_host, _port); sslEngine.setUseClientMode(true); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); @@ -83,32 +82,31 @@ public class NioClient extends NioConnection { s_logger.info("Connected to " + _host + ":" + _port); _clientConnection.configureBlocking(false); - Link link = new Link(peerAddr, this); + final Link link = new Link(peerAddr, this); link.setSSLEngine(sslEngine); - SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ); + final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ); link.setKey(key); key.attach(link); // Notice we've already connected due to the handshake, so let's get the // remaining task done task = _factory.create(Task.Type.CONNECT, link, null); - } catch (GeneralSecurityException e) { + } catch (final GeneralSecurityException e) { _selector.close(); throw new IOException("Failed to initialise security", e); - } catch (IOException e) { + } catch (final IOException e) { _selector.close(); throw e; } - - _executor.execute(task); + _executor.submit(task); } @Override - protected void registerLink(InetSocketAddress saddr, Link link) { + protected void registerLink(final InetSocketAddress saddr, final Link link) { // don't do anything. } @Override - protected void unregisterLink(InetSocketAddress saddr) { + protected void unregisterLink(final InetSocketAddress saddr) { // don't do anything. } @@ -119,7 +117,5 @@ public class NioClient extends NioConnection { _clientConnection.close(); } s_logger.info("NioClient connection closed"); - } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/nio/NioConnection.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 4c66360..ddc84cf 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@ -27,6 +27,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -35,7 +36,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -43,21 +47,23 @@ import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.log4j.Logger; - import org.apache.cloudstack.utils.security.SSLUtils; +import org.apache.log4j.Logger; import com.cloud.utils.concurrency.NamedThreadFactory; +import com.cloud.utils.exception.NioConnectionException; /** * NioConnection abstracts the NIO socket operations. The Java implementation * provides that. */ -public abstract class NioConnection implements Runnable { +public abstract class NioConnection implements Callable { private static final Logger s_logger = Logger.getLogger(NioConnection.class);; protected Selector _selector; - protected Thread _thread; + protected ExecutorService _threadExecutor; + protected Future _futureTask; + protected boolean _isRunning; protected boolean _isStartup; protected int _port; @@ -66,42 +72,48 @@ public abstract class NioConnection implements Runnable { protected String _name; protected ExecutorService _executor; - public NioConnection(String name, int port, int workers, HandlerFactory factory) { + public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) { _name = name; _isRunning = false; - _thread = null; _selector = null; _port = port; _factory = factory; _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory(name + "-Handler")); } - public void start() { + public void start() throws NioConnectionException { _todos = new ArrayList(); - _thread = new Thread(this, _name + "-Selector"); - _isRunning = true; - _thread.start(); - // Wait until we got init() done - synchronized (_thread) { - try { - _thread.wait(); - } catch (InterruptedException e) { - s_logger.warn("Interrupted start thread ", e); - } + try { + init(); + } catch (final ConnectException e) { + s_logger.warn("Unable to connect to remote: is there a server running on port " + _port); + } catch (final IOException e) { + s_logger.error("Unable to initialize the threads.", e); + throw new NioConnectionException(e.getMessage(), e); + } catch (final Exception e) { + s_logger.error("Unable to initialize the threads due to unknown exception.", e); + throw new NioConnectionException(e.getMessage(), e); } + _isStartup = true; + + _threadExecutor = Executors.newSingleThreadExecutor(); + _futureTask = _threadExecutor.submit(this); + + _isRunning = true; } public void stop() { _executor.shutdown(); _isRunning = false; - if (_thread != null) { - _thread.interrupt(); + if (_threadExecutor != null) { + _futureTask.cancel(false); + _threadExecutor.shutdown(); } } public boolean isRunning() { - return _thread.isAlive(); + return !_futureTask.isDone(); } public boolean isStartup() { @@ -109,45 +121,28 @@ public abstract class NioConnection implements Runnable { } @Override - public void run() { - synchronized (_thread) { - try { - init(); - } catch (ConnectException e) { - s_logger.warn("Unable to connect to remote: is there a server running on port " + _port); - return; - } catch (IOException e) { - s_logger.error("Unable to initialize the threads.", e); - return; - } catch (Exception e) { - s_logger.error("Unable to initialize the threads due to unknown exception.", e); - return; - } - _isStartup = true; - _thread.notifyAll(); - } - + public Boolean call() throws NioConnectionException { while (_isRunning) { try { _selector.select(); // Someone is ready for I/O, get the ready keys - Set readyKeys = _selector.selectedKeys(); - Iterator i = readyKeys.iterator(); + final Set readyKeys = _selector.selectedKeys(); + final Iterator i = readyKeys.iterator(); if (s_logger.isTraceEnabled()) { s_logger.trace("Keys Processing: " + readyKeys.size()); } // Walk through the ready keys collection. while (i.hasNext()) { - SelectionKey sk = i.next(); + final SelectionKey sk = i.next(); i.remove(); if (!sk.isValid()) { if (s_logger.isTraceEnabled()) { s_logger.trace("Selection Key is invalid: " + sk.toString()); } - Link link = (Link)sk.attachment(); + final Link link = (Link)sk.attachment(); if (link != null) { link.terminated(); } else { @@ -167,13 +162,18 @@ public abstract class NioConnection implements Runnable { s_logger.trace("Keys Done Processing."); processTodos(); - } catch (Throwable e) { - s_logger.warn("Caught an exception but continuing on.", e); + } catch (final ClosedSelectorException e) { + /* + * Exception occurred when calling java.nio.channels.Selector.selectedKeys() method. It means the connection has not yet been established. Let's continue trying + * We do not log it here otherwise we will fill the disk with messages. + */ + } catch (final IOException e) { + s_logger.error("Agent will die due to this IOException!", e); + throw new NioConnectionException(e.getMessage(), e); } } - synchronized (_thread) { - _isStartup = false; - } + _isStartup = false; + return true; } abstract void init() throws IOException; @@ -182,11 +182,11 @@ public abstract class NioConnection implements Runnable { abstract void unregisterLink(InetSocketAddress saddr); - protected void accept(SelectionKey key) throws IOException { - ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); + protected void accept(final SelectionKey key) throws IOException { + final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); - SocketChannel socketChannel = serverSocketChannel.accept(); - Socket socket = socketChannel.socket(); + final SocketChannel socketChannel = serverSocketChannel.accept(); + final Socket socket = socketChannel.socket(); socket.setKeepAlive(true); if (s_logger.isTraceEnabled()) { @@ -198,7 +198,7 @@ public abstract class NioConnection implements Runnable { SSLEngine sslEngine = null; try { - SSLContext sslContext = Link.initSSLContext(false); + final SSLContext sslContext = Link.initSSLContext(false); sslEngine = sslContext.createSSLEngine(); sslEngine.setUseClientMode(false); sslEngine.setNeedClientAuth(false); @@ -206,7 +206,7 @@ public abstract class NioConnection implements Runnable { Link.doHandshake(socketChannel, sslEngine, false); - } catch (Exception e) { + } catch (final Exception e) { if (s_logger.isTraceEnabled()) { s_logger.trace("Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage()); } @@ -219,53 +219,68 @@ public abstract class NioConnection implements Runnable { s_logger.trace("SSL: Handshake done"); } socketChannel.configureBlocking(false); - InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); - Link link = new Link(saddr, this); + final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); + final Link link = new Link(saddr, this); link.setSSLEngine(sslEngine); link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); - Task task = _factory.create(Task.Type.CONNECT, link, null); + final Task task = _factory.create(Task.Type.CONNECT, link, null); registerLink(saddr, link); - _executor.execute(task); + + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } } - protected void terminate(SelectionKey key) { - Link link = (Link)key.attachment(); + protected void terminate(final SelectionKey key) { + final Link link = (Link)key.attachment(); closeConnection(key); if (link != null) { link.terminated(); - Task task = _factory.create(Task.Type.DISCONNECT, link, null); + final Task task = _factory.create(Task.Type.DISCONNECT, link, null); unregisterLink(link.getSocketAddress()); - _executor.execute(task); + + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } } } - protected void read(SelectionKey key) throws IOException { - Link link = (Link)key.attachment(); + protected void read(final SelectionKey key) throws IOException { + final Link link = (Link)key.attachment(); try { - SocketChannel socketChannel = (SocketChannel)key.channel(); + final SocketChannel socketChannel = (SocketChannel)key.channel(); if (s_logger.isTraceEnabled()) { s_logger.trace("Reading from: " + socketChannel.socket().toString()); } - byte[] data = link.read(socketChannel); + final byte[] data = link.read(socketChannel); if (data == null) { if (s_logger.isTraceEnabled()) { s_logger.trace("Packet is incomplete. Waiting for more."); } return; } - Task task = _factory.create(Task.Type.DATA, link, data); - _executor.execute(task); - } catch (Exception e) { + final Task task = _factory.create(Task.Type.DATA, link, data); + + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } + } catch (final Exception e) { logDebug(e, key, 1); terminate(key); } } - protected void logTrace(Exception e, SelectionKey key, int loc) { + protected void logTrace(final Exception e, final SelectionKey key, final int loc) { if (s_logger.isTraceEnabled()) { Socket socket = null; if (key != null) { - SocketChannel ch = (SocketChannel)key.channel(); + final SocketChannel ch = (SocketChannel)key.channel(); if (ch != null) { socket = ch.socket(); } @@ -275,11 +290,11 @@ public abstract class NioConnection implements Runnable { } } - protected void logDebug(Exception e, SelectionKey key, int loc) { + protected void logDebug(final Exception e, final SelectionKey key, final int loc) { if (s_logger.isDebugEnabled()) { Socket socket = null; if (key != null) { - SocketChannel ch = (SocketChannel)key.channel(); + final SocketChannel ch = (SocketChannel)key.channel(); if (ch != null) { socket = ch.socket(); } @@ -304,113 +319,122 @@ public abstract class NioConnection implements Runnable { s_logger.trace("Todos Processing: " + todos.size()); } SelectionKey key; - for (ChangeRequest todo : todos) { + for (final ChangeRequest todo : todos) { switch (todo.type) { - case ChangeRequest.CHANGEOPS: - try { - key = (SelectionKey)todo.key; - if (key != null && key.isValid()) { - if (todo.att != null) { - key.attach(todo.att); - Link link = (Link)todo.att; - link.setKey(key); - } - key.interestOps(todo.ops); - } - } catch (CancelledKeyException e) { - s_logger.debug("key has been cancelled"); - } - break; - case ChangeRequest.REGISTER: - try { - key = ((SocketChannel)(todo.key)).register(_selector, todo.ops, todo.att); + case ChangeRequest.CHANGEOPS: + try { + key = (SelectionKey)todo.key; + if (key != null && key.isValid()) { if (todo.att != null) { - Link link = (Link)todo.att; + key.attach(todo.att); + final Link link = (Link)todo.att; link.setKey(key); } - } catch (ClosedChannelException e) { - s_logger.warn("Couldn't register socket: " + todo.key); - try { - ((SocketChannel)(todo.key)).close(); - } catch (IOException ignore) { - s_logger.info("[ignored] socket channel"); - } finally { - Link link = (Link)todo.att; - link.terminated(); - } + key.interestOps(todo.ops); } - break; - case ChangeRequest.CLOSE: - if (s_logger.isTraceEnabled()) { - s_logger.trace("Trying to close " + todo.key); + } catch (final CancelledKeyException e) { + s_logger.debug("key has been cancelled"); + } + break; + case ChangeRequest.REGISTER: + try { + key = ((SocketChannel)todo.key).register(_selector, todo.ops, todo.att); + if (todo.att != null) { + final Link link = (Link)todo.att; + link.setKey(key); } - key = (SelectionKey)todo.key; - closeConnection(key); - if (key != null) { - Link link = (Link)key.attachment(); - if (link != null) { - link.terminated(); - } + } catch (final ClosedChannelException e) { + s_logger.warn("Couldn't register socket: " + todo.key); + try { + ((SocketChannel)todo.key).close(); + } catch (final IOException ignore) { + s_logger.info("[ignored] socket channel"); + } finally { + final Link link = (Link)todo.att; + link.terminated(); + } + } + break; + case ChangeRequest.CLOSE: + if (s_logger.isTraceEnabled()) { + s_logger.trace("Trying to close " + todo.key); + } + key = (SelectionKey)todo.key; + closeConnection(key); + if (key != null) { + final Link link = (Link)key.attachment(); + if (link != null) { + link.terminated(); } - break; - default: - s_logger.warn("Shouldn't be here"); - throw new RuntimeException("Shouldn't be here"); + } + break; + default: + s_logger.warn("Shouldn't be here"); + throw new RuntimeException("Shouldn't be here"); } } s_logger.trace("Todos Done processing"); } - protected void connect(SelectionKey key) throws IOException { - SocketChannel socketChannel = (SocketChannel)key.channel(); + protected void connect(final SelectionKey key) throws IOException { + final SocketChannel socketChannel = (SocketChannel)key.channel(); try { socketChannel.finishConnect(); key.interestOps(SelectionKey.OP_READ); - Socket socket = socketChannel.socket(); + final Socket socket = socketChannel.socket(); if (!socket.getKeepAlive()) { socket.setKeepAlive(true); } if (s_logger.isDebugEnabled()) { s_logger.debug("Connected to " + socket); } - Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this); + final Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this); link.setKey(key); key.attach(link); - Task task = _factory.create(Task.Type.CONNECT, link, null); - _executor.execute(task); - } catch (IOException e) { + final Task task = _factory.create(Task.Type.CONNECT, link, null); + + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } + } catch (final IOException e) { logTrace(e, key, 2); terminate(key); } } - protected void scheduleTask(Task task) { - _executor.execute(task); + protected void scheduleTask(final Task task) { + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } } - protected void write(SelectionKey key) throws IOException { - Link link = (Link)key.attachment(); + protected void write(final SelectionKey key) throws IOException { + final Link link = (Link)key.attachment(); try { if (s_logger.isTraceEnabled()) { s_logger.trace("Writing to " + link.getSocketAddress().toString()); } - boolean close = link.write((SocketChannel)key.channel()); + final boolean close = link.write((SocketChannel)key.channel()); if (close) { closeConnection(key); link.terminated(); } else { key.interestOps(SelectionKey.OP_READ); } - } catch (Exception e) { + } catch (final Exception e) { logDebug(e, key, 3); terminate(key); } } - protected void closeConnection(SelectionKey key) { + protected void closeConnection(final SelectionKey key) { if (key != null) { - SocketChannel channel = (SocketChannel)key.channel(); + final SocketChannel channel = (SocketChannel)key.channel(); key.cancel(); try { if (channel != null) { @@ -419,30 +443,30 @@ public abstract class NioConnection implements Runnable { } channel.close(); } - } catch (IOException ignore) { + } catch (final IOException ignore) { s_logger.info("[ignored] channel"); } } } - public void register(int ops, SocketChannel key, Object att) { - ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att); + public void register(final int ops, final SocketChannel key, final Object att) { + final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att); synchronized (this) { _todos.add(todo); } _selector.wakeup(); } - public void change(int ops, SelectionKey key, Object att) { - ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att); + public void change(final int ops, final SelectionKey key, final Object att) { + final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att); synchronized (this) { _todos.add(todo); } _selector.wakeup(); } - public void close(SelectionKey key) { - ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null); + public void close(final SelectionKey key) { + final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null); synchronized (this) { _todos.add(todo); } @@ -466,7 +490,7 @@ public abstract class NioConnection implements Runnable { public int ops; public Object att; - public ChangeRequest(Object key, int type, int ops, Object att) { + public ChangeRequest(final Object key, final int type, final int ops, final Object att) { this.key = key; this.type = type; this.ops = ops; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/nio/NioServer.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java index 98a4a51..539c2bb 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java @@ -37,7 +37,7 @@ public class NioServer extends NioConnection { protected WeakHashMap _links; - public NioServer(String name, int port, int workers, HandlerFactory factory) { + public NioServer(final String name, final int port, final int workers, final HandlerFactory factory) { super(name, port, workers, factory); _localAddr = null; _links = new WeakHashMap(1024); @@ -68,12 +68,12 @@ public class NioServer extends NioConnection { } @Override - protected void registerLink(InetSocketAddress addr, Link link) { + protected void registerLink(final InetSocketAddress addr, final Link link) { _links.put(addr, link); } @Override - protected void unregisterLink(InetSocketAddress saddr) { + protected void unregisterLink(final InetSocketAddress saddr) { _links.remove(saddr); } @@ -86,8 +86,8 @@ public class NioServer extends NioConnection { * @param data * @return null if not sent. attach object in link if sent. */ - public Object send(InetSocketAddress saddr, byte[] data) throws ClosedChannelException { - Link link = _links.get(saddr); + public Object send(final InetSocketAddress saddr, final byte[] data) throws ClosedChannelException { + final Link link = _links.get(saddr); if (link == null) { return null; } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/main/java/com/cloud/utils/nio/Task.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/nio/Task.java b/utils/src/main/java/com/cloud/utils/nio/Task.java index c77c703..60228ef 100644 --- a/utils/src/main/java/com/cloud/utils/nio/Task.java +++ b/utils/src/main/java/com/cloud/utils/nio/Task.java @@ -19,14 +19,14 @@ package com.cloud.utils.nio; -import org.apache.log4j.Logger; +import java.util.concurrent.Callable; + +import com.cloud.utils.exception.TaskExecutionException; /** * Task represents one todo item for the AgentManager or the AgentManager - * */ -public abstract class Task implements Runnable { - private static final Logger s_logger = Logger.getLogger(Task.class); +public abstract class Task implements Callable { public enum Type { CONNECT, // Process a new connection. @@ -40,13 +40,13 @@ public abstract class Task implements Runnable { Type _type; Link _link; - public Task(Type type, Link link, byte[] data) { + public Task(final Type type, final Link link, final byte[] data) { _data = data; _type = type; _link = link; } - public Task(Type type, Link link, Object data) { + public Task(final Type type, final Link link, final Object data) { _data = data; _type = type; _link = link; @@ -76,14 +76,11 @@ public abstract class Task implements Runnable { return _type.toString(); } - abstract protected void doTask(Task task) throws Exception; + abstract protected void doTask(Task task) throws TaskExecutionException; @Override - public final void run() { - try { - doTask(this); - } catch (Throwable e) { - s_logger.warn("Caught the following exception but pushing on", e); - } + public Boolean call() throws TaskExecutionException { + doTask(this); + return true; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/utils/src/test/java/com/cloud/utils/testcase/NioTest.java ---------------------------------------------------------------------- diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java index fc16684..d8510cf 100644 --- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java +++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java @@ -27,6 +27,7 @@ import junit.framework.TestCase; import org.apache.log4j.Logger; import org.junit.Assert; +import com.cloud.utils.exception.NioConnectionException; import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.Link; import com.cloud.utils.nio.NioClient; @@ -56,7 +57,7 @@ public class NioTest extends TestCase { private boolean isTestsDone() { boolean result; synchronized (this) { - result = (_testCount == _completedCount); + result = _testCount == _completedCount; } return result; } @@ -81,16 +82,24 @@ public class NioTest extends TestCase { _completedCount = 0; _server = new NioServer("NioTestServer", 7777, 5, new NioTestServer()); - _server.start(); + try { + _server.start(); + } catch (final NioConnectionException e) { + fail(e.getMessage()); + } _client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient()); - _client.start(); + try { + _client.start(); + } catch (final NioConnectionException e) { + fail(e.getMessage()); + } while (_clientLink == null) { try { s_logger.debug("Link is not up! Waiting ..."); Thread.sleep(1000); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } @@ -101,9 +110,9 @@ public class NioTest extends TestCase { public void tearDown() { while (!isTestsDone()) { try { - s_logger.debug(this._completedCount + "/" + this._testCount + " tests done. Waiting for completion"); + s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting for completion"); Thread.sleep(1000); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } @@ -122,7 +131,7 @@ public class NioTest extends TestCase { s_logger.info("Server stopped."); } - protected void setClientLink(Link link) { + protected void setClientLink(final Link link) { _clientLink = link; } @@ -140,13 +149,13 @@ public class NioTest extends TestCase { getOneMoreTest(); _clientLink.send(_testBytes); s_logger.info("Client: Data sent"); - } catch (ClosedChannelException e) { + } catch (final ClosedChannelException e) { // TODO Auto-generated catch block e.printStackTrace(); } } - protected void doServerProcess(byte[] data) { + protected void doServerProcess(final byte[] data) { oneMoreTestDone(); Assert.assertArrayEquals(_testBytes, data); s_logger.info("Verify done."); @@ -155,13 +164,13 @@ public class NioTest extends TestCase { public class NioTestClient implements HandlerFactory { @Override - public Task create(Type type, Link link, byte[] data) { + public Task create(final Type type, final Link link, final byte[] data) { return new NioTestClientHandler(type, link, data); } public class NioTestClientHandler extends Task { - public NioTestClientHandler(Type type, Link link, byte[] data) { + public NioTestClientHandler(final Type type, final Link link, final byte[] data) { super(type, link, data); } @@ -186,13 +195,13 @@ public class NioTest extends TestCase { public class NioTestServer implements HandlerFactory { @Override - public Task create(Type type, Link link, byte[] data) { + public Task create(final Type type, final Link link, final byte[] data) { return new NioTestServerHandler(type, link, data); } public class NioTestServerHandler extends Task { - public NioTestServerHandler(Type type, Link link, byte[] data) { + public NioTestServerHandler(final Type type, final Link link, final byte[] data) { super(type, link, data); }