giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [6/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:30 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
new file mode 100644
index 0000000..c6f47bb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import java.util.List;
+import com.google.common.collect.Lists;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+/**
+ * Maintains multiple channels and rotates between them.  This is thread-safe.
+ */
+public class ChannelRotater {
+  /** Index of last used channel */
+  private int index = 0;
+  /** Channel list */
+  private final List<Channel> channelList = Lists.newArrayList();
+  /** Task id of this channel */
+  private final Integer taskId;
+
+  /**
+   * Constructor
+   *
+   * @param taskId Id of the task these channels as associated with
+   */
+  public ChannelRotater(Integer taskId) {
+    this.taskId = taskId;
+  }
+
+  public Integer getTaskId() {
+    return taskId;
+  }
+
+  /**
+   * Add a channel to the rotation
+   *
+   * @param channel Channel to add
+   */
+  public synchronized void addChannel(Channel channel) {
+    synchronized (channelList) {
+      channelList.add(channel);
+    }
+  }
+
+  /**
+   * Get the next channel
+   *
+   * @return Next channel
+   */
+  public synchronized Channel nextChannel() {
+    if (channelList.isEmpty()) {
+      throw new IllegalArgumentException("nextChannel: No channels exist!");
+    }
+
+    ++index;
+    if (index >= channelList.size()) {
+      index = 0;
+    }
+    return channelList.get(index);
+  }
+
+  /**
+   * Remove the a channel
+   *
+   * @param channel Channel to remove
+   * @return Return true if successful, false otherwise
+   */
+  public synchronized boolean removeChannel(Channel channel) {
+    boolean success = channelList.remove(channel);
+    if (index >= channelList.size()) {
+      index = 0;
+    }
+    return success;
+  }
+
+  /**
+   * Get the number of channels in this object
+   *
+   * @return Number of channels
+   */
+  public synchronized int size() {
+    return channelList.size();
+  }
+
+  /**
+   * Close the channels
+   *
+   * @param channelFutureListener If desired, pass in a channel future listener
+   */
+  public synchronized void closeChannels(
+      ChannelFutureListener channelFutureListener) {
+    for (Channel channel : channelList) {
+      ChannelFuture channelFuture = channel.close();
+      if (channelFutureListener != null) {
+        channelFuture.addListener(channelFutureListener);
+      }
+    }
+  }
+
+  /**
+   * Get a copy of the channels
+   *
+   * @return Copy of the channels
+   */
+  public synchronized Iterable<Channel> getChannels() {
+    return Lists.newArrayList(channelList);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
new file mode 100644
index 0000000..c66c819
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -0,0 +1,801 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
+import org.apache.giraph.comm.netty.handler.ClientRequestId;
+import org.apache.giraph.comm.netty.handler.RequestEncoder;
+import org.apache.giraph.comm.netty.handler.RequestInfo;
+import org.apache.giraph.comm.netty.handler.RequestServerHandler;
+import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
+/*if_not[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.SaslClientHandler;
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+/*end[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.TimedLogger;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelLocal;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+/**
+ * Netty client for sending requests.  Thread-safe.
+ */
+public class NettyClient {
+  /** Do we have a limit on number of open requests we can have */
+  public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS =
+      "giraph.waitForRequestsConfirmation";
+  /** Default choice about having a limit on number of open requests */
+  public static final boolean LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT = false;
+  /** Maximum number of requests without confirmation we should have */
+  public static final String MAX_NUMBER_OF_OPEN_REQUESTS =
+      "giraph.maxNumberOfOpenRequests";
+  /** Default maximum number of requests without confirmation */
+  public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
+  /** Maximum number of requests to list (for debugging) */
+  public static final int MAX_REQUESTS_TO_LIST = 10;
+  /** 30 seconds to connect by default */
+  public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
+/*if_not[HADOOP_NON_SECURE]*/
+  /** Used to authenticate with other workers acting as servers */
+  public static final ChannelLocal<SaslNettyClient> SASL =
+      new ChannelLocal<SaslNettyClient>();
+/*end[HADOOP_NON_SECURE]*/
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(NettyClient.class);
+  /** Context used to report progress */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Client bootstrap */
+  private final ClientBootstrap bootstrap;
+  /**
+   * Map of the peer connections, mapping from remote socket address to client
+   * meta data
+   */
+  private final ConcurrentMap<InetSocketAddress, ChannelRotater>
+  addressChannelMap = new MapMaker().makeMap();
+  /**
+   * Map from task id to address of its server
+   */
+  private final Map<Integer, InetSocketAddress> taskIdAddressMap =
+      new MapMaker().makeMap();
+  /**
+   * Request map of client request ids to request information.
+   */
+  private final ConcurrentMap<ClientRequestId, RequestInfo>
+  clientRequestIdRequestInfoMap;
+  /** Number of channels per server */
+  private final int channelsPerServer;
+  /** Byte counter for this client */
+  private final ByteCounter byteCounter = new ByteCounter();
+  /** Send buffer size */
+  private final int sendBufferSize;
+  /** Receive buffer size */
+  private final int receiveBufferSize;
+  /** Do we have a limit on number of open requests */
+  private final boolean limitNumberOfOpenRequests;
+  /** Maximum number of requests without confirmation we can have */
+  private final int maxNumberOfOpenRequests;
+  /** Maximum number of connnection failures */
+  private final int maxConnectionFailures;
+  /** Maximum number of milliseconds for a request */
+  private final int maxRequestMilliseconds;
+  /** Waiting internal for checking outstanding requests msecs */
+  private final int waitingRequestMsecs;
+  /** Timed logger for printing request debugging */
+  private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
+  /** Boss factory service */
+  private final ExecutorService bossExecutorService;
+  /** Worker factory service */
+  private final ExecutorService workerExecutorService;
+  /** Address request id generator */
+  private final AddressRequestIdGenerator addressRequestIdGenerator =
+      new AddressRequestIdGenerator();
+  /** Task info */
+  private final TaskInfo myTaskInfo;
+  /** Maximum thread pool size */
+  private final int maxPoolSize;
+  /** Maximum number of attempts to resolve an address*/
+  private final int maxResolveAddressAttempts;
+  /** Execution handler (if used) */
+  private final ExecutionHandler executionHandler;
+  /** Name of the handler before the execution handler (if used) */
+  private final String handlerBeforeExecutionHandler;
+
+  /**
+   * Only constructor
+   *
+   * @param context Context for progress
+   * @param conf Configuration
+   * @param myTaskInfo Current task info
+   */
+  public NettyClient(Mapper<?, ?, ?, ?>.Context context,
+                     final ImmutableClassesGiraphConfiguration conf,
+                     TaskInfo myTaskInfo) {
+    this.context = context;
+    this.myTaskInfo = myTaskInfo;
+    this.channelsPerServer = conf.getInt(
+        GiraphConstants.CHANNELS_PER_SERVER,
+        GiraphConstants.DEFAULT_CHANNELS_PER_SERVER);
+    sendBufferSize = conf.getInt(
+        GiraphConstants.CLIENT_SEND_BUFFER_SIZE,
+        GiraphConstants.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
+    receiveBufferSize = conf.getInt(
+        GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE,
+        GiraphConstants.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+
+    limitNumberOfOpenRequests = conf.getBoolean(
+        LIMIT_NUMBER_OF_OPEN_REQUESTS,
+        LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT);
+    if (limitNumberOfOpenRequests) {
+      maxNumberOfOpenRequests = conf.getInt(
+          MAX_NUMBER_OF_OPEN_REQUESTS,
+          MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("NettyClient: Limit number of open requests to " +
+            maxNumberOfOpenRequests);
+      }
+    } else {
+      maxNumberOfOpenRequests = -1;
+    }
+
+    maxRequestMilliseconds = conf.getInt(
+        GiraphConstants.MAX_REQUEST_MILLISECONDS,
+        GiraphConstants.MAX_REQUEST_MILLISECONDS_DEFAULT);
+
+    maxConnectionFailures = conf.getInt(
+        GiraphConstants.NETTY_MAX_CONNECTION_FAILURES,
+        GiraphConstants.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
+
+    waitingRequestMsecs = conf.getInt(
+        GiraphConstants.WAITING_REQUEST_MSECS,
+        GiraphConstants.WAITING_REQUEST_MSECS_DEFAULT);
+
+    maxPoolSize = conf.getInt(
+        GiraphConstants.NETTY_CLIENT_THREADS,
+        GiraphConstants.NETTY_CLIENT_THREADS_DEFAULT);
+
+    maxResolveAddressAttempts = conf.getInt(
+        GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS,
+        GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
+
+    clientRequestIdRequestInfoMap =
+        new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
+
+    handlerBeforeExecutionHandler = conf.get(
+        GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER,
+        GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT);
+    boolean useExecutionHandler = conf.getBoolean(
+        GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER,
+        GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT);
+    if (useExecutionHandler) {
+      int executionThreads = conf.getInt(
+          GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS,
+          GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS_DEFAULT);
+      executionHandler = new ExecutionHandler(
+          new MemoryAwareThreadPoolExecutor(
+              executionThreads, 1048576, 1048576, 1, TimeUnit.HOURS,
+              new ThreadFactoryBuilder().setNameFormat("netty-client-exec-%d")
+                  .build()));
+      if (LOG.isInfoEnabled()) {
+        LOG.info("NettyClient: Using execution handler with " +
+            executionThreads + " threads after " +
+            handlerBeforeExecutionHandler + ".");
+      }
+    } else {
+      executionHandler = null;
+    }
+
+    bossExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(
+            "netty-client-boss-%d").build());
+    workerExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(
+            "netty-client-worker-%d").build());
+
+    // Configure the client.
+    bootstrap = new ClientBootstrap(
+        new NioClientSocketChannelFactory(
+            bossExecutorService,
+            workerExecutorService,
+            maxPoolSize));
+    bootstrap.setOption("connectTimeoutMillis",
+        MAX_CONNECTION_MILLISECONDS_DEFAULT);
+    bootstrap.setOption("tcpNoDelay", true);
+    bootstrap.setOption("keepAlive", true);
+    bootstrap.setOption("sendBufferSize", sendBufferSize);
+    bootstrap.setOption("receiveBufferSize", receiveBufferSize);
+
+    // Set up the pipeline factory.
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+/*if_not[HADOOP_NON_SECURE]*/
+        if (conf.authenticate()) {
+          LOG.info("Using Netty with authentication.");
+
+          // Our pipeline starts with just byteCounter, and then we use
+          // addLast() to incrementally add pipeline elements, so that we can
+          // name them for identification for removal or replacement after
+          // client is authenticated by server.
+          // After authentication is complete, the pipeline's SASL-specific
+          // functionality is removed, restoring the pipeline to exactly the
+          // same configuration as it would be without authentication.
+          ChannelPipeline pipeline = Channels.pipeline(
+              byteCounter);
+          // The following pipeline component is needed to decode the server's
+          // SASL tokens. It is replaced with a FixedLengthFrameDecoder (same
+          // as used with the non-authenticated pipeline) after authentication
+          // completes (as in non-auth pipeline below).
+          pipeline.addLast("length-field-based-frame-decoder",
+              new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
+          pipeline.addLast("request-encoder", new RequestEncoder(conf.getInt(
+              GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+              GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+          // The following pipeline component responds to the server's SASL
+          // tokens with its own responses. Both client and server share the
+          // same Hadoop Job token, which is used to create the SASL tokens to
+          // authenticate with each other.
+          // After authentication finishes, this pipeline component is removed.
+          pipeline.addLast("sasl-client-handler",
+              new SaslClientHandler(conf));
+          pipeline.addLast("response-handler",
+              new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
+          return pipeline;
+        } else {
+          LOG.info("Using Netty without authentication.");
+/*end[HADOOP_NON_SECURE]*/
+          ChannelPipeline pipeline = pipeline();
+          pipeline.addLast("clientByteCounter", byteCounter);
+          pipeline.addLast("responseFrameDecoder",
+              new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
+          pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
+              GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+              GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+          pipeline.addLast("responseClientHandler",
+              new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
+          if (executionHandler != null) {
+            pipeline.addAfter(handlerBeforeExecutionHandler,
+                "executionHandler", executionHandler);
+          }
+          return pipeline;
+/*if_not[HADOOP_NON_SECURE]*/
+        }
+/*end[HADOOP_NON_SECURE]*/
+      }
+    });
+  }
+
+  /**
+   * Pair object for connectAllAddresses().
+   */
+  private static class ChannelFutureAddress {
+    /** Future object */
+    private final ChannelFuture future;
+    /** Address of the future */
+    private final InetSocketAddress address;
+    /** Task id */
+    private final Integer taskId;
+
+    /**
+     * Constructor.
+     *
+     * @param future Immutable future
+     * @param address Immutable address
+     * @param taskId Immutable taskId
+     */
+    ChannelFutureAddress(
+        ChannelFuture future, InetSocketAddress address, Integer taskId) {
+      this.future = future;
+      this.address = address;
+      this.taskId = taskId;
+    }
+
+    @Override
+    public String toString() {
+      return "(future=" + future + ",address=" + address + ",taskId=" +
+          taskId + ")";
+    }
+  }
+
+  /**
+   * Connect to a collection of tasks servers
+   *
+   * @param tasks Tasks to connect to (if haven't already connected)
+   */
+  public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {
+    List<ChannelFutureAddress> waitingConnectionList =
+        Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
+    for (TaskInfo taskInfo : tasks) {
+      context.progress();
+      InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId());
+      if (address == null ||
+          !address.getHostName().equals(taskInfo.getHostname()) ||
+          address.getPort() != taskInfo.getPort()) {
+        address = resolveAddress(maxResolveAddressAttempts,
+            taskInfo.getInetSocketAddress());
+        taskIdAddressMap.put(taskInfo.getTaskId(), address);
+      }
+      if (address == null || address.getHostName() == null ||
+          address.getHostName().isEmpty()) {
+        throw new IllegalStateException("connectAllAddresses: Null address " +
+            "in addresses " + tasks);
+      }
+      if (address.isUnresolved()) {
+        throw new IllegalStateException("connectAllAddresses: Unresolved " +
+            "address " + address);
+      }
+
+      if (addressChannelMap.containsKey(address)) {
+        continue;
+      }
+
+      // Start connecting to the remote server up to n time
+      for (int i = 0; i < channelsPerServer; ++i) {
+        ChannelFuture connectionFuture = bootstrap.connect(address);
+
+        waitingConnectionList.add(
+            new ChannelFutureAddress(
+                connectionFuture, address, taskInfo.getTaskId()));
+      }
+    }
+
+    // Wait for all the connections to succeed up to n tries
+    int failures = 0;
+    int connected = 0;
+    while (failures < maxConnectionFailures) {
+      List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
+      for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
+        context.progress();
+        ChannelFuture future = waitingConnection.future;
+        ProgressableUtils.awaitChannelFuture(future, context);
+        if (!future.isSuccess()) {
+          LOG.warn("connectAllAddresses: Future failed " +
+              "to connect with " + waitingConnection.address + " with " +
+              failures + " failures because of " + future.getCause());
+
+          ChannelFuture connectionFuture =
+              bootstrap.connect(waitingConnection.address);
+          nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,
+              waitingConnection.address, waitingConnection.taskId));
+          ++failures;
+        } else {
+          Channel channel = future.getChannel();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("connectAllAddresses: Connected to " +
+                channel.getRemoteAddress() + ", open = " + channel.isOpen());
+          }
+
+          if (channel.getRemoteAddress() == null) {
+            throw new IllegalStateException(
+                "connectAllAddresses: Null remote address!");
+          }
+
+          ChannelRotater rotater =
+              addressChannelMap.get(waitingConnection.address);
+          if (rotater == null) {
+            ChannelRotater newRotater =
+                new ChannelRotater(waitingConnection.taskId);
+            rotater = addressChannelMap.putIfAbsent(
+                waitingConnection.address, newRotater);
+            if (rotater == null) {
+              rotater = newRotater;
+            }
+          }
+          rotater.addChannel(future.getChannel());
+          ++connected;
+        }
+      }
+      LOG.info("connectAllAddresses: Successfully added " +
+          (waitingConnectionList.size() - nextCheckFutures.size()) +
+          " connections, (" + connected + " total connected) " +
+          nextCheckFutures.size() + " failed, " +
+          failures + " failures total.");
+      if (nextCheckFutures.isEmpty()) {
+        break;
+      }
+      waitingConnectionList = nextCheckFutures;
+    }
+    if (failures >= maxConnectionFailures) {
+      throw new IllegalStateException(
+          "connectAllAddresses: Too many failures (" + failures + ").");
+    }
+  }
+
+/*if_not[HADOOP_NON_SECURE]*/
+  /**
+   * Authenticate all servers in addressChannelMap.
+   */
+  public void authenticate() {
+    LOG.info("authenticate: NettyClient starting authentication with " +
+        "servers.");
+    for (InetSocketAddress address: addressChannelMap.keySet()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("authenticate: Authenticating with address:" + address);
+      }
+      ChannelRotater channelRotater = addressChannelMap.get(address);
+      for (Channel channel: channelRotater.getChannels()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("authenticate: Authenticating with server on channel: " +
+              channel);
+        }
+        authenticateOnChannel(channelRotater.getTaskId(), channel);
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("authenticate: NettyClient successfully authenticated with " +
+          addressChannelMap.size() + " server" +
+          ((addressChannelMap.size() != 1) ? "s" : "") +
+          " - continuing with normal work.");
+    }
+  }
+
+  /**
+   * Authenticate with server connected at given channel.
+   *
+   * @param taskId Task id of the channel
+   * @param channel Connection to server to authenticate with.
+   */
+  private void authenticateOnChannel(Integer taskId, Channel channel) {
+    try {
+      SaslNettyClient saslNettyClient = SASL.get(channel);
+      if (SASL.get(channel) == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("authenticateOnChannel: Creating saslNettyClient now " +
+              "for channel: " + channel);
+        }
+        saslNettyClient = new SaslNettyClient();
+        SASL.set(channel, saslNettyClient);
+      }
+      if (!saslNettyClient.isComplete()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("authenticateOnChannel: Waiting for authentication " +
+              "to complete..");
+        }
+        SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
+        sendWritableRequest(taskId, saslTokenMessage);
+        // We now wait for Netty's thread pool to communicate over this
+        // channel to authenticate with another worker acting as a server.
+        try {
+          synchronized (saslNettyClient.getAuthenticated()) {
+            while (!saslNettyClient.isComplete()) {
+              saslNettyClient.getAuthenticated().wait();
+            }
+          }
+        } catch (InterruptedException e) {
+          LOG.error("authenticateOnChannel: Interrupted while waiting for " +
+              "authentication.");
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("authenticateOnChannel: Authentication on channel: " +
+            channel + " has completed successfully.");
+      }
+    } catch (IOException e) {
+      LOG.error("authenticateOnChannel: Failed to authenticate with server " +
+          "due to error: " + e);
+    }
+    return;
+  }
+/*end[HADOOP_NON_SECURE]*/
+
+  /**
+   * Stop the client.
+   */
+  public void stop() {
+    // Close connections asynchronously, in a Netty-approved
+    // way, without cleaning up thread pools until all channels
+    // in addressChannelMap are closed (success or failure)
+    int channelCount = 0;
+    for (ChannelRotater channelRotater : addressChannelMap.values()) {
+      channelCount += channelRotater.size();
+    }
+    final int done = channelCount;
+    final AtomicInteger count = new AtomicInteger(0);
+    for (ChannelRotater channelRotater : addressChannelMap.values()) {
+      channelRotater.closeChannels(new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture cf) {
+          context.progress();
+          if (count.incrementAndGet() == done) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("stop: reached wait threshold, " +
+                  done + " connections closed, releasing " +
+                  "NettyClient.bootstrap resources now.");
+            }
+            bossExecutorService.shutdownNow();
+            workerExecutorService.shutdownNow();
+            bootstrap.releaseExternalResources();
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Get the next available channel, reconnecting if necessary
+   *
+   * @param remoteServer Remote server to get a channel for
+   * @return Available channel for this remote server
+   */
+  private Channel getNextChannel(InetSocketAddress remoteServer) {
+    Channel channel = addressChannelMap.get(remoteServer).nextChannel();
+    if (channel == null) {
+      throw new IllegalStateException(
+          "getNextChannel: No channel exists for " + remoteServer);
+    }
+
+    // Return this channel if it is connected
+    if (channel.isConnected()) {
+      return channel;
+    }
+
+    // Get rid of the failed channel
+    if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
+      LOG.warn("getNextChannel: Unlikely event that the channel " +
+          channel + " was already removed!");
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getNextChannel: Fixing disconnected channel to " +
+          remoteServer + ", open = " + channel.isOpen() + ", " +
+          "bound = " + channel.isBound());
+    }
+    int reconnectFailures = 0;
+    while (reconnectFailures < maxConnectionFailures) {
+      ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
+      ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+      if (connectionFuture.isSuccess()) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("getNextChannel: Connected to " + remoteServer + "!");
+        }
+        addressChannelMap.get(remoteServer).addChannel(
+            connectionFuture.getChannel());
+        return connectionFuture.getChannel();
+      }
+      ++reconnectFailures;
+      LOG.warn("getNextChannel: Failed to reconnect to " +  remoteServer +
+          " on attempt " + reconnectFailures + " out of " +
+          maxConnectionFailures + " max attempts, sleeping for 5 secs",
+          connectionFuture.getCause());
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        LOG.warn("getNextChannel: Unexpected interrupted exception", e);
+      }
+    }
+    throw new IllegalStateException("getNextChannel: Failed to connect " +
+        "to " + remoteServer + " in " + reconnectFailures +
+        " connect attempts");
+  }
+
+  /**
+   * Send a request to a remote server (should be already connected)
+   *
+   * @param destTaskId Destination task id
+   * @param request Request to send
+   */
+  public void sendWritableRequest(Integer destTaskId,
+      WritableRequest request) {
+    InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
+    if (clientRequestIdRequestInfoMap.isEmpty()) {
+      byteCounter.resetAll();
+    }
+    boolean registerRequest = true;
+/*if_not[HADOOP_NON_SECURE]*/
+    if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
+      registerRequest = false;
+    }
+/*end[HADOOP_NON_SECURE]*/
+
+    Channel channel = getNextChannel(remoteServer);
+    RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
+    if (registerRequest) {
+      request.setClientId(myTaskInfo.getTaskId());
+      request.setRequestId(
+        addressRequestIdGenerator.getNextRequestId(remoteServer));
+      ClientRequestId clientRequestId =
+        new ClientRequestId(destTaskId, request.getRequestId());
+      RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
+        clientRequestId, newRequestInfo);
+      if (oldRequestInfo != null) {
+        throw new IllegalStateException("sendWritableRequest: Impossible to " +
+          "have a previous request id = " + request.getRequestId() + ", " +
+          "request info of " + oldRequestInfo);
+      }
+    }
+    ChannelFuture writeFuture = channel.write(request);
+    newRequestInfo.setWriteFuture(writeFuture);
+
+    if (limitNumberOfOpenRequests &&
+        clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
+      waitSomeRequests(maxNumberOfOpenRequests);
+    }
+  }
+
+  /**
+   * Ensure all the request sent so far are complete.
+   *
+   * @throws InterruptedException
+   */
+  public void waitAllRequests() {
+    waitSomeRequests(0);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("waitAllRequests: Finished all requests. " +
+          byteCounter.getMetrics());
+    }
+  }
+
+  /**
+   * Ensure that at most maxOpenRequests are not complete.  Periodically,
+   * check the state of every request.  If we find the connection failed,
+   * re-establish it and re-send the request.
+   *
+   * @param maxOpenRequests Maximum number of requests which can be not
+   *                        complete
+   */
+  private void waitSomeRequests(int maxOpenRequests) {
+    List<ClientRequestId> addedRequestIds = Lists.newArrayList();
+    List<RequestInfo> addedRequestInfos = Lists.newArrayList();
+
+    while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {
+      // Wait for requests to complete for some time
+      if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
+        LOG.info("waitSomeRequests: Waiting interval of " +
+            waitingRequestMsecs + " msecs, " +
+            clientRequestIdRequestInfoMap.size() +
+            " open requests, waiting for it to be <= " + maxOpenRequests +
+            ", " + byteCounter.getMetrics());
+
+        if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
+          for (Map.Entry<ClientRequestId, RequestInfo> entry :
+              clientRequestIdRequestInfoMap.entrySet()) {
+            LOG.info("waitSomeRequests: Waiting for request " +
+                entry.getKey() + " - " + entry.getValue());
+          }
+        }
+      }
+      synchronized (clientRequestIdRequestInfoMap) {
+        if (clientRequestIdRequestInfoMap.size() <= maxOpenRequests) {
+          break;
+        }
+        try {
+          clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
+        } catch (InterruptedException e) {
+          LOG.error("waitFutures: Got unexpected InterruptedException", e);
+        }
+      }
+      // Make sure that waiting doesn't kill the job
+      context.progress();
+
+      // Check all the requests for problems
+      for (Map.Entry<ClientRequestId, RequestInfo> entry :
+          clientRequestIdRequestInfoMap.entrySet()) {
+        RequestInfo requestInfo = entry.getValue();
+        ChannelFuture writeFuture = requestInfo.getWriteFuture();
+        // If not connected anymore, request failed, or the request is taking
+        // too long, re-establish and resend
+        if (!writeFuture.getChannel().isConnected() ||
+            (writeFuture.isDone() && !writeFuture.isSuccess()) ||
+            (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {
+          LOG.warn("waitSomeRequests: Problem with request id " +
+              entry.getKey() + " connected = " +
+              writeFuture.getChannel().isConnected() +
+              ", future done = " + writeFuture.isDone() + ", " +
+              "success = " + writeFuture.isSuccess() + ", " +
+              "cause = " + writeFuture.getCause() + ", " +
+              "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
+              "destination = " + writeFuture.getChannel().getRemoteAddress() +
+              " " + requestInfo);
+          addedRequestIds.add(entry.getKey());
+          addedRequestInfos.add(new RequestInfo(
+              requestInfo.getDestinationAddress(), requestInfo.getRequest()));
+        }
+      }
+
+      // Add any new requests to the system, connect if necessary, and re-send
+      for (int i = 0; i < addedRequestIds.size(); ++i) {
+        ClientRequestId requestId = addedRequestIds.get(i);
+        RequestInfo requestInfo = addedRequestInfos.get(i);
+
+        if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==
+            null) {
+          LOG.warn("waitSomeRequests: Request " + requestId +
+              " completed prior to sending the next request");
+          clientRequestIdRequestInfoMap.remove(requestId);
+        }
+        InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
+        Channel channel = getNextChannel(remoteServer);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("waitSomeRequests: Re-issuing request " + requestInfo);
+        }
+        ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+        requestInfo.setWriteFuture(writeFuture);
+      }
+      addedRequestIds.clear();
+      addedRequestInfos.clear();
+    }
+  }
+
+  /**
+   * Utility method for resolving addresses
+   *
+   * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
+   *        address
+   * @param address The address we are attempting to resolve
+   * @return The successfully resolved address.
+   * @throws IllegalStateException if the address is not resolved
+   *         in <code>maxResolveAddressAttempts</code> tries.
+   */
+  private static InetSocketAddress resolveAddress(
+      int maxResolveAddressAttempts, InetSocketAddress address) {
+    int resolveAttempts = 0;
+    while (address.isUnresolved() &&
+        resolveAttempts < maxResolveAddressAttempts) {
+      ++resolveAttempts;
+      LOG.warn("resolveAddress: Failed to resolve " + address +
+          " on attempt " + resolveAttempts + " of " +
+          maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        LOG.warn("resolveAddress: Interrupted.", e);
+      }
+      address = new InetSocketAddress(address.getHostName(),
+          address.getPort());
+    }
+    if (resolveAttempts >= maxResolveAddressAttempts) {
+      throw new IllegalStateException("resolveAddress: Couldn't " +
+          "resolve " + address + " in " +  resolveAttempts + " tries.");
+    }
+    return address;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
new file mode 100644
index 0000000..c575172
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.SendAggregatorCache;
+import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
+import org.apache.giraph.graph.Aggregator;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+
+/**
+ * Netty implementation of {@link MasterClient}
+ */
+public class NettyMasterClient implements MasterClient {
+  /** Netty client that does the actual I/O */
+  private final NettyClient nettyClient;
+  /** Worker information for current superstep */
+  private CentralizedServiceMaster<?, ?, ?, ?> service;
+  /** Cached map of partition ids to serialized aggregator data */
+  private final SendAggregatorCache sendAggregatorCache =
+      new SendAggregatorCache();
+  /** How big a single aggregator request can be */
+  private final int maxBytesPerAggregatorRequest;
+  /** Progressable used to report progress */
+  private final Progressable progressable;
+
+  /**
+   * Constructor
+   *
+   * @param context Context from mapper
+   * @param configuration Configuration
+   * @param service Centralized service
+   */
+  public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
+                           ImmutableClassesGiraphConfiguration configuration,
+                           CentralizedServiceMaster<?, ?, ?, ?> service) {
+    this.nettyClient =
+        new NettyClient(context, configuration, service.getMasterInfo());
+    this.service = service;
+    this.progressable = context;
+    maxBytesPerAggregatorRequest = configuration.getInt(
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
+  }
+
+  @Override
+  public void openConnections() {
+    nettyClient.connectAllAddresses(service.getWorkerInfoList());
+  }
+
+  @Override
+  public void sendAggregator(String aggregatorName,
+      Class<? extends Aggregator> aggregatorClass,
+      Writable aggregatedValue) throws IOException {
+    WorkerInfo owner =
+        AggregatorUtils.getOwner(aggregatorName, service.getWorkerInfoList());
+    int currentSize = sendAggregatorCache.addAggregator(owner.getTaskId(),
+        aggregatorName, aggregatorClass, aggregatedValue);
+    if (currentSize >= maxBytesPerAggregatorRequest) {
+      flushAggregatorsToWorker(owner);
+    }
+  }
+
+  @Override
+  public void finishSendingAggregatedValues() throws IOException {
+    for (WorkerInfo worker : service.getWorkerInfoList()) {
+      sendAggregatorCache.addCountAggregator(worker.getTaskId());
+      flushAggregatorsToWorker(worker);
+      progressable.progress();
+    }
+    sendAggregatorCache.reset();
+  }
+
+  /**
+   * Send aggregators from cache to worker.
+   *
+   * @param worker Worker which we want to send aggregators to
+   */
+  private void flushAggregatorsToWorker(WorkerInfo worker) {
+    byte[] aggregatorData =
+        sendAggregatorCache.removeAggregators(worker.getTaskId());
+    nettyClient.sendWritableRequest(
+        worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData));
+  }
+
+  @Override
+  public void flush() {
+    nettyClient.waitAllRequests();
+  }
+
+  @Override
+  public void closeConnections() {
+    nettyClient.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
new file mode 100644
index 0000000..1f04bcf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
+import org.apache.giraph.comm.MasterServer;
+import org.apache.hadoop.util.Progressable;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Netty implementation of {@link MasterServer}
+ */
+public class NettyMasterServer implements MasterServer {
+  /** Netty client that does the actual I/O */
+  private final NettyServer nettyServer;
+
+  /**
+   * Constructor
+   *
+   * @param conf Hadoop configuration
+   * @param service Centralized service
+   * @param progressable Progressable for reporting progress
+   */
+  public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
+      CentralizedServiceMaster<?, ?, ?, ?> service,
+      Progressable progressable) {
+    nettyServer = new NettyServer(conf,
+        new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
+        service.getMasterInfo(), progressable);
+    nettyServer.start();
+  }
+
+  @Override
+  public InetSocketAddress getMyAddress() {
+    return nettyServer.getMyAddress();
+  }
+
+  @Override
+  public void close() {
+    nettyServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
new file mode 100644
index 0000000..971c7c5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+/*if_not[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
+/*end[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.RequestDecoder;
+import org.apache.giraph.comm.netty.handler.RequestServerHandler;
+import org.apache.giraph.comm.netty.handler.ResponseEncoder;
+/*if_not[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.SaslServerHandler;
+/*end[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelLocal;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+/**
+ * This server uses Netty and will implement all Giraph communication
+ */
+public class NettyServer {
+  /** Default maximum thread pool size */
+  public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
+
+
+/*if_not[HADOOP_NON_SECURE]*/
+  /** Used to authenticate with netty clients */
+  public static final ChannelLocal<SaslNettyServer>
+  CHANNEL_SASL_NETTY_SERVERS =
+    new ChannelLocal<SaslNettyServer>();
+/*end[HADOOP_NON_SECURE]*/
+
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(NettyServer.class);
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+  /** Progressable for reporting progress */
+  private final Progressable progressable;
+  /** Factory of channels */
+  private ChannelFactory channelFactory;
+  /** Accepted channels */
+  private final ChannelGroup accepted = new DefaultChannelGroup();
+  /** Local hostname */
+  private final String localHostname;
+  /** Address of the server */
+  private InetSocketAddress myAddress;
+  /** Current task info */
+  private TaskInfo myTaskInfo;
+  /** Maximum number of threads */
+  private final int maxPoolSize;
+  /** TCP backlog */
+  private final int tcpBacklog;
+  /** Factory for {@link RequestServerHandler} */
+  private final RequestServerHandler.Factory requestServerHandlerFactory;
+/*if_not[HADOOP_NON_SECURE]*/
+  /** Factory for {@link RequestServerHandler} */
+  private SaslServerHandler.Factory saslServerHandlerFactory;
+/*end[HADOOP_NON_SECURE]*/
+  /** Server bootstrap */
+  private ServerBootstrap bootstrap;
+  /** Byte counter for this client */
+  private final ByteCounter byteCounter = new ByteCounter();
+  /** Send buffer size */
+  private final int sendBufferSize;
+  /** Receive buffer size */
+  private final int receiveBufferSize;
+  /** Boss factory service */
+  private final ExecutorService bossExecutorService;
+  /** Worker factory service */
+  private final ExecutorService workerExecutorService;
+  /** Request completed map per worker */
+  private final WorkerRequestReservedMap workerRequestReservedMap;
+  /** Execution handler (if used) */
+  private final ExecutionHandler executionHandler;
+  /** Name of the handler before the execution handler (if used) */
+  private final String handlerBeforeExecutionHandler;
+
+  /**
+   * Constructor for creating the server
+   *
+   * @param conf Configuration to use
+   * @param requestServerHandlerFactory Factory for request handlers
+   * @param myTaskInfo Current task info
+   * @param progressable Progressable for reporting progress
+   */
+  public NettyServer(ImmutableClassesGiraphConfiguration conf,
+      RequestServerHandler.Factory requestServerHandlerFactory,
+      TaskInfo myTaskInfo, Progressable progressable) {
+    this.conf = conf;
+    this.progressable = progressable;
+    this.requestServerHandlerFactory = requestServerHandlerFactory;
+    /*if_not[HADOOP_NON_SECURE]*/
+    this.saslServerHandlerFactory = new SaslServerHandler.Factory();
+    /*end[HADOOP_NON_SECURE]*/
+    this.myTaskInfo = myTaskInfo;
+    sendBufferSize = conf.getInt(
+        GiraphConstants.SERVER_SEND_BUFFER_SIZE,
+        GiraphConstants.DEFAULT_SERVER_SEND_BUFFER_SIZE);
+    receiveBufferSize = conf.getInt(
+        GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE,
+        GiraphConstants.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
+
+    workerRequestReservedMap = new WorkerRequestReservedMap(conf);
+
+    bossExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(
+            "netty-server-boss-%d").build());
+    workerExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(
+            "netty-server-worker-%d").build());
+
+    try {
+      this.localHostname = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      throw new IllegalStateException("NettyServer: unable to get hostname");
+    }
+
+    maxPoolSize = conf.getInt(
+        GiraphConstants.NETTY_SERVER_THREADS,
+        GiraphConstants.NETTY_SERVER_THREADS_DEFAULT);
+
+    tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG,
+        conf.getInt(GiraphConstants.MAX_WORKERS,
+            GiraphConstants.TCP_BACKLOG_DEFAULT));
+
+    channelFactory = new NioServerSocketChannelFactory(
+        bossExecutorService,
+        workerExecutorService,
+        maxPoolSize);
+
+    handlerBeforeExecutionHandler = conf.get(
+        GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER,
+        GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT);
+    boolean useExecutionHandler = conf.getBoolean(
+        GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER,
+        GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+    if (useExecutionHandler) {
+      int executionThreads = conf.getNettyServerExecutionThreads();
+      executionHandler = new ExecutionHandler(
+          new MemoryAwareThreadPoolExecutor(
+              executionThreads, 1048576, 1048576, 1, TimeUnit.HOURS,
+              new ThreadFactoryBuilder().setNameFormat("netty-server-exec-%d").
+                  build()));
+      if (LOG.isInfoEnabled()) {
+        LOG.info("NettyServer: Using execution handler with " +
+            executionThreads + " threads after " +
+            handlerBeforeExecutionHandler + ".");
+      }
+    } else {
+      executionHandler = null;
+    }
+  }
+
+/*if_not[HADOOP_NON_SECURE]*/
+  /**
+   * Constructor for creating the server
+   *
+   * @param conf Configuration to use
+   * @param requestServerHandlerFactory Factory for request handlers
+   * @param myTaskInfo Current task info
+   * @param progressable Progressable for reporting progress
+   * @param saslServerHandlerFactory  Factory for SASL handlers
+   */
+  public NettyServer(ImmutableClassesGiraphConfiguration conf,
+                     RequestServerHandler.Factory requestServerHandlerFactory,
+                     TaskInfo myTaskInfo,
+                     Progressable progressable,
+                     SaslServerHandler.Factory saslServerHandlerFactory) {
+    this(conf, requestServerHandlerFactory, myTaskInfo, progressable);
+    this.saslServerHandlerFactory = saslServerHandlerFactory;
+  }
+/*end[HADOOP_NON_SECURE]*/
+
+  /**
+   * Start the server with the appropriate port
+   */
+  public void start() {
+    bootstrap = new ServerBootstrap(channelFactory);
+    // Set up the pipeline factory.
+    bootstrap.setOption("child.keepAlive", true);
+    bootstrap.setOption("child.tcpNoDelay", true);
+    bootstrap.setOption("child.sendBufferSize", sendBufferSize);
+    bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
+    bootstrap.setOption("backlog", tcpBacklog);
+    bootstrap.setOption("child.receiveBufferSizePredictorFactory",
+        new WrappedAdaptiveReceiveBufferSizePredictorFactory(
+            receiveBufferSize / 4,
+            receiveBufferSize,
+            receiveBufferSize));
+
+    /**
+     * Pipeline setup: depends on whether configured to use authentication
+     * or not.
+     */
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+/*if_not[HADOOP_NON_SECURE]*/
+        if (conf.authenticate()) {
+          LOG.info("start: Will use Netty pipeline with " +
+              "authentication and authorization of clients.");
+          // After a client authenticates, the two authentication-specific
+          // pipeline components SaslServerHandler and ResponseEncoder are
+          // removed, leaving the pipeline the same as in the non-authenticated
+          // configuration except for the presence of the Authorize component.
+          return Channels.pipeline(
+              byteCounter,
+              new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
+              new RequestDecoder(conf, byteCounter),
+              // Removed after authentication completes:
+              saslServerHandlerFactory.newHandler(conf),
+              new AuthorizeServerHandler(),
+              requestServerHandlerFactory.newHandler(workerRequestReservedMap,
+                  conf, myTaskInfo),
+              // Removed after authentication completes:
+              new ResponseEncoder());
+        } else {
+          LOG.info("start: Using Netty without authentication.");
+/*end[HADOOP_NON_SECURE]*/
+          ChannelPipeline pipeline = pipeline();
+
+          // Store all connected channels in order to ensure that we can close
+          // them on stop(), or else stop() may hang waiting for the
+          // connections to close on their own
+          pipeline.addLast("connectedChannels",
+              new SimpleChannelUpstreamHandler() {
+                @Override
+                public void channelConnected(ChannelHandlerContext ctx,
+                    ChannelStateEvent e) throws Exception {
+                  super.channelConnected(ctx, e);
+                  accepted.add(e.getChannel());
+                }
+              });
+          pipeline.addLast("serverByteCounter", byteCounter);
+          pipeline.addLast("requestFrameDecoder",
+              new LengthFieldBasedFrameDecoder(
+                  1024 * 1024 * 1024, 0, 4, 0, 4));
+          pipeline.addLast("requestDecoder",
+              new RequestDecoder(conf, byteCounter));
+          pipeline.addLast("requestProcessor",
+              requestServerHandlerFactory.newHandler(
+                  workerRequestReservedMap, conf, myTaskInfo));
+          if (executionHandler != null) {
+            pipeline.addAfter(handlerBeforeExecutionHandler,
+                "executionHandler", executionHandler);
+          }
+          return pipeline;
+/*if_not[HADOOP_NON_SECURE]*/
+        }
+/*end[HADOOP_NON_SECURE]*/
+      }
+    });
+
+    int taskId = conf.getTaskPartition();
+    int numTasks = conf.getInt("mapred.map.tasks", 1);
+    // Number of workers + 1 for master
+    int numServers = conf.getInt(GiraphConstants.MAX_WORKERS, numTasks) + 1;
+    int portIncrementConstant =
+        (int) Math.pow(10, Math.ceil(Math.log10(numServers)));
+    int bindPort = conf.getInt(GiraphConstants.IPC_INITIAL_PORT,
+        GiraphConstants.IPC_INITIAL_PORT_DEFAULT) +
+        taskId;
+    int bindAttempts = 0;
+    final int maxIpcPortBindAttempts =
+        conf.getInt(GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS,
+            GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT);
+    final boolean failFirstPortBindingAttempt =
+        conf.getBoolean(GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT,
+            GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT);
+
+    // Simple handling of port collisions on the same machine while
+    // preserving debugability from the port number alone.
+    // Round up the max number of workers to the next power of 10 and use
+    // it as a constant to increase the port number with.
+    while (bindAttempts < maxIpcPortBindAttempts) {
+      this.myAddress = new InetSocketAddress(localHostname, bindPort);
+      if (failFirstPortBindingAttempt && bindAttempts == 0) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("start: Intentionally fail first " +
+              "binding attempt as giraph.failFirstIpcPortBindAttempt " +
+              "is true, port " + bindPort);
+        }
+        ++bindAttempts;
+        bindPort += portIncrementConstant;
+        continue;
+      }
+
+      try {
+        Channel ch = bootstrap.bind(myAddress);
+        accepted.add(ch);
+
+        break;
+      } catch (ChannelException e) {
+        LOG.warn("start: Likely failed to bind on attempt " +
+            bindAttempts + " to port " + bindPort, e);
+        ++bindAttempts;
+        bindPort += portIncrementConstant;
+      }
+    }
+    if (bindAttempts == maxIpcPortBindAttempts || myAddress == null) {
+      throw new IllegalStateException(
+          "start: Failed to start NettyServer with " +
+              bindAttempts + " attempts");
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("start: Started server " +
+          "communication server: " + myAddress + " with up to " +
+          maxPoolSize + " threads on bind attempt " + bindAttempts +
+          " with sendBufferSize = " + sendBufferSize +
+          " receiveBufferSize = " + receiveBufferSize + " backlog = " +
+          bootstrap.getOption("backlog"));
+    }
+  }
+
+  /**
+   * Stop the server.
+   */
+  public void stop() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("stop: Halting netty server");
+    }
+    ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
+    bossExecutorService.shutdownNow();
+    ProgressableUtils.awaitExecutorTermination(bossExecutorService,
+        progressable);
+    workerExecutorService.shutdownNow();
+    ProgressableUtils.awaitExecutorTermination(workerExecutorService,
+        progressable);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("stop: Start releasing resources");
+    }
+    bootstrap.releaseExternalResources();
+    channelFactory.releaseExternalResources();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("stop: Netty server halted");
+    }
+  }
+
+  public InetSocketAddress getMyAddress() {
+    return myAddress;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
new file mode 100644
index 0000000..6838321
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.SendAggregatedValueCache;
+import org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest;
+import org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest;
+import org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+
+/**
+ * Netty implementation of {@link WorkerAggregatorRequestProcessor}
+ */
+public class NettyWorkerAggregatorRequestProcessor
+    implements WorkerAggregatorRequestProcessor {
+  /** Progressable used to report progress */
+  private final Progressable progressable;
+  /** NettyClient that could be shared among one or more instances */
+  private final WorkerClient<?, ?, ?, ?> workerClient;
+  /** Service worker */
+  private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+  /** Cached map of partition ids to serialized aggregator data */
+  private final SendAggregatedValueCache sendAggregatedValueCache =
+      new SendAggregatedValueCache();
+  /** How big a single aggregator request can be */
+  private final int maxBytesPerAggregatorRequest;
+
+  /**
+   * Constructor.
+   *
+   * @param progressable  Progressable used to report progress
+   * @param configuration Configuration
+   * @param serviceWorker Service worker
+   */
+  public NettyWorkerAggregatorRequestProcessor(
+      Progressable progressable,
+      ImmutableClassesGiraphConfiguration<?, ?, ?, ?> configuration,
+      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+    this.serviceWorker = serviceWorker;
+    this.workerClient = serviceWorker.getWorkerClient();
+    this.progressable = progressable;
+    maxBytesPerAggregatorRequest = configuration.getInt(
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
+
+  }
+
+  @Override
+  public boolean sendAggregatedValue(String aggregatorName,
+      Writable aggregatedValue) throws IOException {
+    WorkerInfo owner =
+        AggregatorUtils.getOwner(aggregatorName,
+            serviceWorker.getWorkerInfoList());
+    if (isThisWorker(owner)) {
+      return false;
+    } else {
+      int currentSize = sendAggregatedValueCache.addAggregator(
+          owner.getTaskId(), aggregatorName, aggregatedValue);
+      if (currentSize >= maxBytesPerAggregatorRequest) {
+        flushAggregatorsToWorker(owner);
+      }
+      return true;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      if (!isThisWorker(workerInfo)) {
+        sendAggregatedValueCache.addCountAggregator(workerInfo.getTaskId());
+        flushAggregatorsToWorker(workerInfo);
+        progressable.progress();
+      }
+    }
+    sendAggregatedValueCache.reset();
+  }
+
+  /**
+   * Send aggregators from cache to worker.
+   *
+   * @param worker Worker which we want to send aggregators to
+   */
+  private void flushAggregatorsToWorker(WorkerInfo worker) {
+    byte[] aggregatorData =
+        sendAggregatedValueCache.removeAggregators(worker.getTaskId());
+    workerClient.sendWritableRequest(worker.getTaskId(),
+        new SendWorkerAggregatorsRequest(aggregatorData));
+  }
+
+  @Override
+  public void sendAggregatedValuesToMaster(
+      byte[] aggregatorData) throws IOException {
+    workerClient.sendWritableRequest(serviceWorker.getMasterInfo().getTaskId(),
+        new SendAggregatorsToMasterRequest(aggregatorData));
+  }
+
+  @Override
+  public void distributeAggregators(
+      Iterable<byte[]> aggregatorDataList) throws IOException {
+    for (byte[] aggregatorData : aggregatorDataList) {
+      SendAggregatorsToWorkerRequest request =
+          new SendAggregatorsToWorkerRequest(aggregatorData);
+      for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {
+        if (!isThisWorker(worker)) {
+          workerClient.sendWritableRequest(worker.getTaskId(), request);
+        }
+        progressable.progress();
+      }
+    }
+  }
+
+  /**
+   * Check if workerInfo describes current worker.
+   *
+   * @param workerInfo Worker to check
+   * @return True iff workerInfo corresponds to current worker.
+   */
+  private boolean isThisWorker(WorkerInfo workerInfo) {
+    return serviceWorker.getWorkerInfo().getTaskId() == workerInfo.getTaskId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
new file mode 100644
index 0000000..4aff7b9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Takes users facing APIs in {@link WorkerClient} and implements them
+ * using the available {@link WritableRequest} objects.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class NettyWorkerClient<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    WorkerClient<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
+  /** Hadoop configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Netty client that does that actual I/O */
+  private final NettyClient nettyClient;
+  /** Centralized service, needed to get vertex ranges */
+  private final CentralizedServiceWorker<I, V, E, M> service;
+
+  /**
+   * Only constructor.
+   *
+   * @param context Context from mapper
+   * @param configuration Configuration
+   * @param service Used to get partition mapping
+   */
+  public NettyWorkerClient(
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      CentralizedServiceWorker<I, V, E, M> service) {
+    this.nettyClient =
+        new NettyClient(context, configuration, service.getWorkerInfo());
+    this.conf = configuration;
+    this.service = service;
+  }
+
+  public CentralizedServiceWorker<I, V, E, M> getService() {
+    return service;
+  }
+
+  @Override
+  public void openConnections() {
+    List<TaskInfo> addresses = Lists.newArrayListWithCapacity(
+        service.getWorkerInfoList().size());
+    for (WorkerInfo info : service.getWorkerInfoList()) {
+      // No need to connect to myself
+      if (service.getWorkerInfo().getTaskId() != info.getTaskId()) {
+        addresses.add(info);
+      }
+    }
+    addresses.add(service.getMasterInfo());
+    nettyClient.connectAllAddresses(addresses);
+  }
+
+  @Override
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return service.getVertexPartitionOwner(vertexId);
+  }
+
+  @Override
+  public void sendWritableRequest(Integer destTaskId,
+                                  WritableRequest request) {
+    nettyClient.sendWritableRequest(destTaskId, request);
+  }
+
+  @Override
+  public void waitAllRequests() {
+    nettyClient.waitAllRequests();
+  }
+
+  @Override
+  public void closeConnections() throws IOException {
+    nettyClient.stop();
+  }
+
+/*if[HADOOP_NON_SECURE]
+  @Override
+  public void setup() {
+    openConnections();
+  }
+else[HADOOP_NON_SECURE]*/
+  @Override
+  public void setup(boolean authenticate) {
+    openConnections();
+    if (authenticate) {
+      authenticate();
+    }
+  }
+/*end[HADOOP_NON_SECURE]*/
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+  @Override
+  public void authenticate() {
+    nettyClient.authenticate();
+  }
+/*end[HADOOP_NON_SECURE]*/
+}


Mime
View raw message