hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject [1/3] hama git commit: Changed netty transport for async communication from NIO to EPOLL
Date Thu, 17 Mar 2016 00:03:43 GMT
Repository: hama
Updated Branches:
  refs/heads/master 64d7254fa -> f30e0c410


Changed netty transport for async communication from NIO to EPOLL


Project: http://git-wip-us.apache.org/repos/asf/hama/repo
Commit: http://git-wip-us.apache.org/repos/asf/hama/commit/1dc5eaa2
Tree: http://git-wip-us.apache.org/repos/asf/hama/tree/1dc5eaa2
Diff: http://git-wip-us.apache.org/repos/asf/hama/diff/1dc5eaa2

Branch: refs/heads/master
Commit: 1dc5eaa2e5eeacb7ddc6571098a6c82117ec3745
Parents: 583c69b
Author: JongYoon Lim <seedengine@gmail.com>
Authored: Wed Mar 16 12:12:31 2016 +1300
Committer: JongYoon Lim <seedengine@gmail.com>
Committed: Wed Mar 16 12:12:31 2016 +1300

----------------------------------------------------------------------
 .../java/org/apache/hama/ipc/AsyncClient.java   | 47 ++++++++------------
 .../java/org/apache/hama/ipc/AsyncServer.java   | 13 +++---
 2 files changed, 25 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hama/blob/1dc5eaa2/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
index ba0266a..878c85f 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
@@ -20,24 +20,29 @@ package org.apache.hama.ipc;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.*;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BSPNetUtils;
 
+import javax.net.SocketFactory;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.ConnectException;
@@ -51,20 +56,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.util.BSPNetUtils;
-
 /**
  * A client for an IPC service using netty. IPC calls take a single
  * {@link Writable} as a parameter, and return a {@link Writable} as their
@@ -186,7 +177,7 @@ public class AsyncClient {
      * @throws IOException
      */
     public Connection(ConnectionId remoteId) throws IOException {
-      group = new NioEventLoopGroup();
+      group = new EpollEventLoopGroup();
       bootstrap = new Bootstrap();
       this.remoteId = remoteId;
       this.serverAddress = remoteId.getAddress();
@@ -280,12 +271,12 @@ public class AsyncClient {
           }
 
           // Configure the client.
-          // NioEventLoopGroup is a multithreaded event loop that handles I/O
+          // EpollEventLoopGroup is a multithreaded event loop that handles I/O
           // operation
-          group = new NioEventLoopGroup();
+          group = new EpollEventLoopGroup();
           // Bootstrap is a helper class that sets up a client
           bootstrap = new Bootstrap();
-          bootstrap.group(group).channel(NioSocketChannel.class)
+          bootstrap.group(group).channel(EpollSocketChannel.class)
               .option(ChannelOption.TCP_NODELAY, this.tcpNoDelay)
               .option(ChannelOption.SO_KEEPALIVE, true)
               .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, pingInterval)

http://git-wip-us.apache.org/repos/asf/hama/blob/1dc5eaa2/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
index 93627aa..67ad5d0 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
@@ -20,12 +20,12 @@ package org.apache.hama.ipc;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.*;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,7 +48,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.*;
-import java.util.concurrent.Future;
 
 /**
  * An abstract IPC service using netty. IPC calls take a single {@link Writable}
@@ -82,8 +81,8 @@ public abstract class AsyncServer {
   private int port; // port we listen on
   private Class<? extends Writable> paramClass; // class of call parameters
   // Configure the server.(constructor is thread num)
-  private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
-  private EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
+  private EventLoopGroup workerGroup = new EpollEventLoopGroup();
   private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String,
Class<?>>();
   private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
 
@@ -192,7 +191,7 @@ public abstract class AsyncServer {
       // ServerBootstrap is a helper class that sets up a server
       ServerBootstrap b = new ServerBootstrap();
       b.group(bossGroup, workerGroup)
-          .channel(NioServerSocketChannel.class)
+          .channel(EpollServerSocketChannel.class)
           .option(ChannelOption.SO_BACKLOG, backlogLength)
           .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
           .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)


Mime
View raw message