giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject svn commit: r1417114 - in /giraph/trunk: CHANGELOG giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
Date Tue, 04 Dec 2012 18:52:31 GMT
Author: maja
Date: Tue Dec  4 18:52:30 2012
New Revision: 1417114

URL: http://svn.apache.org/viewvc?rev=1417114&view=rev
Log:
GIRAPH-441: Keep track of connected channels in NettyServer

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1417114&r1=1417113&r2=1417114&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Dec  4 18:52:30 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-441: Keep track of connected channels in NettyServer (majakabiljo)
+
   GIRAPH-440: ProgressableUtils - TimeoutException from future.get shouldn't 
   be rethrown (majakabiljo)
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1417114&r1=1417113&r2=1417114&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Tue Dec
 4 18:52:30 2012
@@ -47,6 +47,7 @@ import org.apache.giraph.comm.requests.S
 /*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
@@ -399,8 +400,8 @@ else[HADOOP_NON_SECURE]*/
       List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
       for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
         context.progress();
-        ChannelFuture future =
-            waitingConnection.future.awaitUninterruptibly();
+        ChannelFuture future = waitingConnection.future;
+        ProgressableUtils.awaitChannelFuture(future, context);
         if (!future.isSuccess()) {
           LOG.warn("connectAllAddresses: Future failed " +
               "to connect with " + waitingConnection.address + " with " +
@@ -596,7 +597,7 @@ else[HADOOP_NON_SECURE]*/
     int reconnectFailures = 0;
     while (reconnectFailures < maxConnectionFailures) {
       ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
-      connectionFuture.awaitUninterruptibly();
+      ProgressableUtils.awaitChannelFuture(connectionFuture, context);
       if (connectionFuture.isSuccess()) {
         if (LOG.isInfoEnabled()) {
           LOG.info("getNextChannel: Connected to " + remoteServer + "!");

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1417114&r1=1417113&r2=1417114&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Tue Dec
 4 18:52:30 2012
@@ -47,10 +47,13 @@ import org.jboss.netty.bootstrap.ServerB
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelLocal;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
@@ -271,6 +274,18 @@ else[HADOOP_NON_SECURE]*/
 /*end[HADOOP_NON_SECURE]*/
           ChannelPipeline pipeline = pipeline();
 
+          // Store all connected channels in order to ensure that we can close
+          // them on stop(), or else stop() may hang waiting for the
+          // connections to close on their own
+          pipeline.addLast("connectedChannels",
+              new SimpleChannelUpstreamHandler() {
+                @Override
+                public void channelConnected(ChannelHandlerContext ctx,
+                    ChannelStateEvent e) throws Exception {
+                  super.channelConnected(ctx, e);
+                  accepted.add(e.getChannel());
+                }
+              });
           pipeline.addLast("serverByteCounter", byteCounter);
           pipeline.addLast("requestFrameDecoder",
               new LengthFieldBasedFrameDecoder(
@@ -363,7 +378,14 @@ else[HADOOP_NON_SECURE]*/
     }
     ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
     bossExecutorService.shutdownNow();
+    ProgressableUtils.awaitExecutorTermination(bossExecutorService,
+        progressable);
     workerExecutorService.shutdownNow();
+    ProgressableUtils.awaitExecutorTermination(workerExecutorService,
+        progressable);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("stop: Start releasing resources");
+    }
     bootstrap.releaseExternalResources();
     channelFactory.releaseExternalResources();
     if (LOG.isInfoEnabled()) {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1417114&r1=1417113&r2=1417114&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java Tue Dec
 4 18:52:30 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.utils;
 
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 
 import java.util.concurrent.ExecutionException;
@@ -79,6 +80,18 @@ public class ProgressableUtils {
   }
 
   /**
+   * Wait for {@link ChannelFuture} to finish, while periodically
+   * reporting progress.
+   *
+   * @param future       ChannelFuture
+   * @param progressable Progressable for reporting progress (Job context)
+   */
+  public static void awaitChannelFuture(ChannelFuture future,
+      Progressable progressable) {
+    waitForever(new ChannelFutureWaitable(future), progressable);
+  }
+
+  /**
    * Wait forever for waitable to finish. Periodically reports progress.
    *
    * @param waitable Waitable which we wait for
@@ -268,7 +281,7 @@ public class ProgressableUtils {
   }
 
   /**
-   * {@link Waitable} for waiting on a {@link ChannelGroupFutureWaitable} to
+   * {@link Waitable} for waiting on a {@link ChannelGroupFuture} to
    * terminate.
    */
   private static class ChannelGroupFutureWaitable extends
@@ -295,4 +308,32 @@ public class ProgressableUtils {
       return future.isDone();
     }
   }
+
+  /**
+   * {@link Waitable} for waiting on a {@link ChannelFuture} to
+   * terminate.
+   */
+  private static class ChannelFutureWaitable extends WaitableWithoutResult {
+    /** ChannelGroupFuture which we want to wait for */
+    private final ChannelFuture future;
+
+    /**
+     * Constructor
+     *
+     * @param future ChannelFuture which we want to wait for
+     */
+    public ChannelFutureWaitable(ChannelFuture future) {
+      this.future = future;
+    }
+
+    @Override
+    public void waitFor(int msecs) throws InterruptedException {
+      future.await(msecs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean isFinished() {
+      return future.isDone();
+    }
+  }
 }



Mime
View raw message