giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1370430 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/comm/
Date Tue, 07 Aug 2012 18:54:43 GMT
Author: aching
Date: Tue Aug  7 18:54:42 2012
New Revision: 1370430

URL: http://svn.apache.org/viewvc?rev=1370430&view=rev
Log:
GIRAPH-289: Add thread and channel pooling to NettyClient and
NettyServer. (ekoontz via aching)


Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug  7 18:54:42 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-289: Add thread and channel pooling to NettyClient and
+  NettyServer. (ekoontz via aching)
+
   GIRAPH-276: Fix broken tests in pseudo-distributed mode.
   (Alessandro Presta via jghoman)
 

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java?rev=1370430&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java Tue Aug  7 18:54:42
2012
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.util.Collection;
+import java.util.List;
+import com.google.common.collect.Lists;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Maintains multiple channels and rotates between them
+ */
+public class ChannelRotater {
+  /** Index of last used channel */
+  private int index = 0;
+  /** Channel list */
+  private List<Channel> channelList = Lists.newArrayList();
+
+  /**
+   * Add a channel to the rotation
+   *
+   * @param channel Channel to add
+   */
+  public void addChannel(Channel channel) {
+    channelList.add(channel);
+  }
+
+  /**
+   * Get the next channel
+   *
+   * @return Next channel
+   */
+  public Channel nextChannel() {
+    if (channelList.isEmpty()) {
+      throw new IllegalArgumentException("nextChannel: No channels exist!");
+    }
+
+    ++index;
+    if (index >= channelList.size()) {
+      index = 0;
+    }
+    return channelList.get(index);
+  }
+
+  /**
+   * Get the channels
+   *
+   * @return Collection of the channels
+   */
+  Collection<Channel> getChannels() {
+    return channelList;
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug  7 18:54:42
2012
@@ -18,10 +18,10 @@
 
 package org.apache.giraph.comm;
 
+import com.google.common.collect.Maps;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
@@ -69,9 +69,10 @@ public class NettyClient<I extends Writa
    * Map of the peer connections, mapping from remote socket address to client
    * meta data
    */
-  private final Map<InetSocketAddress, Channel> addressChannelMap =
-      new HashMap<InetSocketAddress, Channel>();
-
+  private final Map<InetSocketAddress, ChannelRotater> addressChannelMap =
+      Maps.newHashMap();
+  /** Number of channels per server */
+  private final int channelsPerServer;
   /** Send buffer size */
   private final int sendBufferSize;
   /** Receive buffer size */
@@ -85,6 +86,9 @@ public class NettyClient<I extends Writa
   public NettyClient(Mapper<?, ?, ?, ?>.Context context) {
     this.context = context;
     Configuration conf = context.getConfiguration();
+    this.channelsPerServer = conf.getInt(
+        GiraphJob.CHANNELS_PER_SERVER,
+        GiraphJob.DEFAULT_CHANNELS_PER_SERVER);
     sendBufferSize = conf.getInt(GiraphJob.CLIENT_SEND_BUFFER_SIZE,
         GiraphJob.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
     receiveBufferSize = conf.getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
@@ -94,7 +98,9 @@ public class NettyClient<I extends Writa
     bootstrap = new ClientBootstrap(
         new NioClientSocketChannelFactory(
             Executors.newCachedThreadPool(),
-            Executors.newCachedThreadPool()));
+            Executors.newCachedThreadPool(),
+            conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+                NettyServer.DEFAULT_MAXIMUM_THREAD_POOL_SIZE)));
 
     // Set up the pipeline factory.
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -112,32 +118,52 @@ public class NettyClient<I extends Writa
    *
    * @param addresses Addresses to connect to (if haven't already connected)
    */
-  public void connectAllAdddresses(Collection<InetSocketAddress> addresses) {
+  public void connectAllAddresses(Collection<InetSocketAddress> addresses) {
     List<ChannelFuture> waitingConnectionList =
         new ArrayList<ChannelFuture>();
     for (InetSocketAddress address : addresses) {
+      if (address == null) {
+        throw new IllegalStateException("connectAllAddresses: Null address " +
+            "in addresses " + addresses);
+      }
+
       if (addressChannelMap.containsKey(address)) {
         continue;
       }
-      // Start connecting to the remote server
-      ChannelFuture connectionFuture = bootstrap.connect(address);
-      connectionFuture.getChannel().getConfig().setOption("tcpNoDelay", true);
-      connectionFuture.getChannel().getConfig().setOption("keepAlive", true);
-      connectionFuture.getChannel().getConfig().setOption("sendBufferSize",
-          sendBufferSize);
-      connectionFuture.getChannel().getConfig().setOption("receiveBufferSize",
-          receiveBufferSize);
-      addressChannelMap.put(address, connectionFuture.getChannel());
 
-      waitingConnectionList.add(connectionFuture);
+      // Start connecting to the remote server up to n time
+      ChannelRotater channelRotater = new ChannelRotater();
+      for (int i = 0; i < channelsPerServer; ++i) {
+        ChannelFuture connectionFuture = bootstrap.connect(address);
+        connectionFuture.getChannel().getConfig().setOption("tcpNoDelay", true);
+        connectionFuture.getChannel().getConfig().setOption("keepAlive", true);
+        connectionFuture.getChannel().getConfig().setOption(
+            "sendBufferSize", sendBufferSize);
+        connectionFuture.getChannel().getConfig().setOption(
+            "receiveBufferSize", receiveBufferSize);
+        channelRotater.addChannel(connectionFuture.getChannel());
+        waitingConnectionList.add(connectionFuture);
+      }
+      addressChannelMap.put(address, channelRotater);
     }
 
     // Wait for all the connections to succeed
     for (ChannelFuture waitingConnection : waitingConnectionList) {
-      waitingConnection.awaitUninterruptibly().getChannel();
+      ChannelFuture future =
+          waitingConnection.awaitUninterruptibly();
+      if (!future.isSuccess()) {
+        throw new IllegalStateException("connectAllAddresses: Future failed " +
+            "with " + future.getCause());
+      }
+      Channel channel = future.getChannel();
       if (LOG.isInfoEnabled()) {
-        LOG.info("connectAllAaddresses: Connected to " +
-            waitingConnection.getChannel().getRemoteAddress());
+        LOG.info("connectAllAddresses: Connected to " +
+            channel.getRemoteAddress());
+      }
+
+      if (channel.getRemoteAddress() == null) {
+        throw new IllegalStateException("connectAllAddresses: Null remote " +
+            "address!");
       }
     }
   }
@@ -149,23 +175,29 @@ public class NettyClient<I extends Writa
     // close connections asyncronously, in a Netty-approved
     // way, without cleaning up thread pools until all channels
     // in addressChannelMap are closed (success or failure)
-    final int done = addressChannelMap.size();
+    int channelCount = 0;
+    for (ChannelRotater channelRotater : addressChannelMap.values()) {
+      channelCount += channelRotater.getChannels().size();
+    }
+    final int done = channelCount;
     final AtomicInteger count = new AtomicInteger(0);
-    for (Channel channel : addressChannelMap.values()) {
-      ChannelFuture result = channel.close();
-      result.addListener(new ChannelFutureListener() {
-        @Override
-        public void operationComplete(ChannelFuture cf) {
-          if (count.incrementAndGet() == done) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("stop: reached wait threshold, " +
-                done + " connections closed, releasing " +
-                "NettyClient.bootstrap resources now.");
+    for (ChannelRotater channelRotater : addressChannelMap.values()) {
+      for (Channel channel : channelRotater.getChannels()) {
+        ChannelFuture result = channel.close();
+        result.addListener(new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture cf) {
+            if (count.incrementAndGet() == done) {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("stop: reached wait threshold, " +
+                    done + " connections closed, releasing " +
+                    "NettyClient.bootstrap resources now.");
+              }
+              bootstrap.releaseExternalResources();
             }
-            bootstrap.releaseExternalResources();
           }
-        }
-      });
+        });
+      }
     }
   }
 
@@ -176,9 +208,9 @@ public class NettyClient<I extends Writa
    * @param request Request to send
    */
   public void sendWritableRequest(InetSocketAddress remoteServer,
-    WritableRequest<I, V, E, M> request) {
+                                  WritableRequest<I, V, E, M> request) {
     waitingRequestCount.incrementAndGet();
-    Channel channel = addressChannelMap.get(remoteServer);
+    Channel channel = addressChannelMap.get(remoteServer).nextChannel();
     if (channel == null) {
       throw new IllegalStateException(
           "sendWritableRequest: No channel exists for " + remoteServer);
@@ -194,11 +226,6 @@ public class NettyClient<I extends Writa
   public void waitAllRequests() {
     synchronized (waitingRequestCount) {
       while (waitingRequestCount.get() != 0) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("waitAllRequests: Waiting interval of " +
-              WAITING_REQUEST_MSECS + " msecs and still waiting on " +
-              waitingRequestCount + " requests");
-        }
         try {
           waitingRequestCount.wait(WAITING_REQUEST_MSECS);
         } catch (InterruptedException e) {

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Tue Aug  7 18:54:42
2012
@@ -59,7 +59,7 @@ public class NettyServer<I extends Writa
      V extends Writable, E extends Writable,
      M extends Writable> {
   /** Default maximum thread pool size */
-  public static final int DEFAULT_MAXIMUM_THREAD_POOL_SIZE = 64;
+  public static final int DEFAULT_MAXIMUM_THREAD_POOL_SIZE = 32;
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyServer.class);
   /** Configuration */
@@ -124,16 +124,12 @@ public class NettyServer<I extends Writa
     }
     maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
                                   DEFAULT_MAXIMUM_THREAD_POOL_SIZE);
-    try {
-      workerThreadPool =
-        (ThreadPoolExecutor) Executors.newCachedThreadPool(workerFactory);
-      workerThreadPool.setMaximumPoolSize(maximumPoolSize);
-    } catch (ClassCastException e) {
-      LOG.warn("Netty worker thread pool is not of type ThreadPoolExecutor", e);
-    }
+    Executors.newCachedThreadPool(workerFactory);
+
     channelFactory = new NioServerSocketChannelFactory(
         Executors.newCachedThreadPool(bossFactory),
-        workerThreadPool);
+        Executors.newCachedThreadPool(workerFactory),
+        maximumPoolSize);
   }
 
   /**
@@ -192,6 +188,9 @@ public class NettyServer<I extends Writa
         accepted.add(ch);
         tcpNoDelay = ch.getConfig().setOption("tcpNoDelay", true);
         keepAlive = ch.getConfig().setOption("keepAlive", true);
+        ch.getConfig().setOption("sendBufferSize", sendBufferSize);
+        ch.getConfig().setOption("receiveBufferSize", receiveBufferSize);
+
         break;
       } catch (ChannelException e) {
         LOG.warn("start: Likely failed to bind on attempt " +
@@ -211,7 +210,8 @@ public class NettyServer<I extends Writa
           "communication server: " + myAddress + " with up to " +
           maximumPoolSize + " threads on bind attempt " + bindAttempts +
           " with tcpNoDelay = " + tcpNoDelay + " and keepAlive = " +
-          keepAlive);
+          keepAlive + " sendBufferSize = " + sendBufferSize +
+          " receiveBufferSize = " + receiveBufferSize);
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java Tue Aug  7 18:54:42
2012
@@ -132,7 +132,7 @@ public class NettyWorkerClient<I extends
       }
       addresses.add(partitionOwner.getWorkerInfo().getHostnamePort());
     }
-    nettyClient.connectAllAdddresses(addresses);
+    nettyClient.connectAllAddresses(addresses);
   }
 
   /**
@@ -188,10 +188,10 @@ public class NettyWorkerClient<I extends
           getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
       Map<I, Collection<M>> partitionMessages =
           sendMessageCache.removePartitionMessages(partitionId);
-      WritableRequest<I, V, E, M> writableReauest =
+      WritableRequest<I, V, E, M> writableRequest =
           new SendPartitionMessagesRequest<I, V, E, M>(
               partitionId, partitionMessages);
-      nettyClient.sendWritableRequest(remoteServerAddress, writableReauest);
+      nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
     }
   }
 
@@ -258,10 +258,10 @@ public class NettyWorkerClient<I extends
           getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
       Map<I, VertexMutations<I, V, E, M>> partitionMutations =
           sendMutationsCache.removePartitionMutations(partitionId);
-      WritableRequest<I, V, E, M> writableReauest =
+      WritableRequest<I, V, E, M> writableRequest =
           new SendPartitionMutationsRequest<I, V, E, M>(
               partitionId, partitionMutations);
-      nettyClient.sendWritableRequest(remoteServerAddress, writableReauest);
+      nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
     }
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Aug  7 18:54:42
2012
@@ -233,6 +233,12 @@ public class GiraphJob {
   /** Default number of messages that can be bulk sent during a flush */
   public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
 
+  /** Number of channels used per server */
+  public static final String CHANNELS_PER_SERVER =
+      "giraph.channelsPerServer";
+  /** Default number of channels used per server of 1 */
+  public static final int DEFAULT_CHANNELS_PER_SERVER = 1;
+
   /** Number of flush threads per peer */
   public static final String MSG_NUM_FLUSH_THREADS =
       "giraph.msgNumFlushThreads";

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue Aug  7 18:54:42
2012
@@ -65,7 +65,7 @@ public class ConnectionTest {
     NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
         new NettyClient<IntWritable, IntWritable, IntWritable,
         IntWritable>(context);
-    client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
 
     client.stop();
     server.stop();
@@ -105,7 +105,7 @@ public class ConnectionTest {
         IntWritable>(context);
     List<InetSocketAddress> serverAddresses =
         new ArrayList<InetSocketAddress>();
-    client.connectAllAdddresses(serverAddresses);
+    client.connectAllAddresses(serverAddresses);
 
     client.stop();
     server1.stop();
@@ -137,15 +137,15 @@ public class ConnectionTest {
     NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client1 =
         new NettyClient<IntWritable, IntWritable, IntWritable,
         IntWritable>(context);
-    client1.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+    client1.connectAllAddresses(Collections.singleton(server.getMyAddress()));
     NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client2 =
         new NettyClient<IntWritable, IntWritable, IntWritable,
         IntWritable>(context);
-    client2.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+    client2.connectAllAddresses(Collections.singleton(server.getMyAddress()));
     NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client3 =
         new NettyClient<IntWritable, IntWritable, IntWritable,
         IntWritable>(context);
-    client3.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+    client3.connectAllAddresses(Collections.singleton(server.getMyAddress()));
 
     client1.stop();
     client2.stop();

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Aug  7 18:54:42
2012
@@ -105,7 +105,7 @@ public class RequestTest {
     client =
         new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
             (context);
-    client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+    client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
   }
 
   @Test



Mime
View raw message