giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [7/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:30 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
new file mode 100644
index 0000000..0bc57d2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.SendMessageCache;
+import org.apache.giraph.comm.SendMutationsCache;
+import org.apache.giraph.comm.SendPartitionCache;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
+import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
+import org.apache.giraph.comm.requests.SendVertexRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.comm.requests.WorkerRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.BspService;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.ValueGauge;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.PairList;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Aggregate requests and sends them to the thread-safe NettyClient.  This
+ * class is not thread-safe and expected to be used and then thrown away after
+ * a phase of communication has completed.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    WorkerClientRequestProcessor<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(NettyWorkerClientRequestProcessor.class);
+  /** Cached partitions of vertices to send */
+  private final SendPartitionCache<I, V, E, M> sendPartitionCache;
+  /** Cached map of partitions to vertex indices to messages */
+  private final SendMessageCache<I, M> sendMessageCache;
+  /** Cached map of partitions to vertex indices to mutations */
+  private final SendMutationsCache<I, V, E, M> sendMutationsCache =
+      new SendMutationsCache<I, V, E, M>();
+  /** NettyClient that could be shared among one or more instances */
+  private final WorkerClient<I, V, E, M> workerClient;
+  /** Messages sent during the last superstep */
+  private long totalMsgsSentInSuperstep = 0;
+  /** Maximum size of messages per remote worker to cache before sending */
+  private final int maxMessagesSizePerWorker;
+  /** Maximum number of mutations per partition before sending */
+  private final int maxMutationsPerPartition;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  /** Server data from the server (used for local requests) */
+  private final ServerData<I, V, E, M> serverData;
+
+  // Per-Superstep Metrics
+  /** messages sent in a superstep */
+  private final ValueGauge<Long> msgsSentInSuperstep;
+
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param configuration Configuration
+   * @param serviceWorker Service worker
+   */
+  public NettyWorkerClientRequestProcessor(
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+    this.workerClient = serviceWorker.getWorkerClient();
+    this.configuration = configuration;
+
+    sendPartitionCache = new SendPartitionCache<I, V, E, M>(context,
+        configuration);
+    sendMessageCache =
+        new SendMessageCache<I, M>(configuration, serviceWorker);
+    maxMessagesSizePerWorker = configuration.getInt(
+        GiraphConstants.MAX_MSG_REQUEST_SIZE,
+        GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT);
+    maxMutationsPerPartition = configuration.getInt(
+        GiraphConstants.MAX_MUTATIONS_PER_REQUEST,
+        GiraphConstants.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+    this.serviceWorker = serviceWorker;
+    this.serverData = serviceWorker.getServerData();
+
+    // Per-Superstep Metrics.
+    // Since this object is not long lived we just initialize the metrics here.
+    GiraphMetrics gmr = GiraphMetrics.get();
+    msgsSentInSuperstep = new ValueGauge<Long>(gmr.perSuperstep(), "msgs-sent");
+  }
+
+  @Override
+  public boolean sendMessageRequest(I destVertexId, M message) {
+    PartitionOwner owner =
+        serviceWorker.getVertexPartitionOwner(destVertexId);
+    WorkerInfo workerInfo = owner.getWorkerInfo();
+    final int partitionId = owner.getPartitionId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
+          ") to " + destVertexId + " on worker " + workerInfo);
+    }
+    ++totalMsgsSentInSuperstep;
+
+    // Add the message to the cache
+    int workerMessageSize = sendMessageCache.addMessage(
+        workerInfo, partitionId, destVertexId, message);
+
+    // Send a request if the cache of outgoing message to
+    // the remote worker 'workerInfo' is full enough to be flushed
+    if (workerMessageSize >= maxMessagesSizePerWorker) {
+      PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+          workerMessages =
+          sendMessageCache.removeWorkerMessages(workerInfo);
+      WritableRequest writableRequest =
+          new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
+      doRequest(workerInfo, writableRequest);
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public void sendPartitionRequest(WorkerInfo workerInfo,
+                                   Partition<I, V, E, M> partition) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("sendVertexRequest: Sending to " + workerInfo +
+          ", with partition " + partition);
+    }
+
+    WritableRequest vertexRequest =
+        new SendVertexRequest<I, V, E, M>(partition);
+    doRequest(workerInfo, vertexRequest);
+
+    // Messages are stored separately
+    if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
+      sendPartitionMessages(workerInfo, partition);
+    }
+  }
+
+  /**
+   * Send all messages for a partition to another worker.
+   *
+   * @param workerInfo Worker to send the partition messages to
+   * @param partition Partition whose messages to send
+   */
+  private void sendPartitionMessages(WorkerInfo workerInfo,
+                                     Partition<I, V, E, M> partition) {
+    final int partitionId = partition.getId();
+    MessageStoreByPartition<I, M> messageStore =
+        serverData.getCurrentMessageStore();
+    ByteArrayVertexIdMessages<I, M> vertexIdMessages =
+        new ByteArrayVertexIdMessages<I, M>();
+    vertexIdMessages.setConf(configuration);
+    vertexIdMessages.initialize();
+    for (I vertexId :
+        messageStore.getPartitionDestinationVertices(partitionId)) {
+      try {
+        // Messages cannot be re-used from this iterable, but add()
+        // serializes the message, making this safe
+        Iterable<M> messages = messageStore.getVertexMessages(vertexId);
+        for (M message : messages) {
+          vertexIdMessages.add(vertexId, message);
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "sendVertexRequest: Got IOException ", e);
+      }
+      if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
+        WritableRequest messagesRequest = new
+            SendPartitionCurrentMessagesRequest<I, V, E, M>(
+            partitionId, vertexIdMessages);
+        doRequest(workerInfo, messagesRequest);
+        vertexIdMessages =
+            new ByteArrayVertexIdMessages<I, M>();
+        vertexIdMessages.setConf(configuration);
+        vertexIdMessages.initialize();
+      }
+    }
+    if (!vertexIdMessages.isEmpty()) {
+      WritableRequest messagesRequest = new
+          SendPartitionCurrentMessagesRequest<I, V, E, M>(
+          partitionId, vertexIdMessages);
+      doRequest(workerInfo, messagesRequest);
+    }
+  }
+
+  @Override
+  public void sendVertexRequest(PartitionOwner partitionOwner,
+                                Vertex<I, V, E, M> vertex) {
+    Partition<I, V, E, M> partition =
+        sendPartitionCache.addVertex(partitionOwner, vertex);
+    if (partition == null) {
+      return;
+    }
+
+    sendPartitionRequest(partitionOwner.getWorkerInfo(), partition);
+  }
+
+  @Override
+  public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
+      IOException {
+    PartitionOwner partitionOwner =
+        serviceWorker.getVertexPartitionOwner(vertexIndex);
+    int partitionId = partitionOwner.getPartitionId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
+          vertexIndex + " with partition " + partitionId);
+    }
+
+    // Add the message to the cache
+    int partitionMutationCount =
+        sendMutationsCache.addEdgeMutation(partitionId, vertexIndex, edge);
+
+    sendMutationsRequestIfFull(
+        partitionId, partitionOwner, partitionMutationCount);
+  }
+
+  /**
+   * Send a mutations request if the maximum number of mutations per partition
+   * was met.
+   *
+   * @param partitionId Partition id
+   * @param partitionOwner Owner of the partition
+   * @param partitionMutationCount Number of mutations for this partition
+   */
+  private void sendMutationsRequestIfFull(
+      int partitionId, PartitionOwner partitionOwner,
+      int partitionMutationCount) {
+    // Send a request if enough mutations are there for a partition
+    if (partitionMutationCount >= maxMutationsPerPartition) {
+      Map<I, VertexMutations<I, V, E, M>> partitionMutations =
+          sendMutationsCache.removePartitionMutations(partitionId);
+      WritableRequest writableRequest =
+          new SendPartitionMutationsRequest<I, V, E, M>(
+              partitionId, partitionMutations);
+      doRequest(partitionOwner.getWorkerInfo(), writableRequest);
+    }
+  }
+
+  @Override
+  public void removeEdgesRequest(I vertexIndex,
+                                 I destinationVertexIndex) throws IOException {
+    PartitionOwner partitionOwner =
+        serviceWorker.getVertexPartitionOwner(vertexIndex);
+    int partitionId = partitionOwner.getPartitionId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("removeEdgesRequest: Removing edge " +
+          destinationVertexIndex +
+          " for index " + vertexIndex + " with partition " + partitionId);
+    }
+
+    // Add the message to the cache
+    int partitionMutationCount =
+        sendMutationsCache.removeEdgeMutation(
+            partitionId, vertexIndex, destinationVertexIndex);
+
+    sendMutationsRequestIfFull(
+        partitionId, partitionOwner, partitionMutationCount);
+  }
+
+  @Override
+  public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
+    PartitionOwner partitionOwner =
+        serviceWorker.getVertexPartitionOwner(vertex.getId());
+    int partitionId = partitionOwner.getPartitionId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("addVertexRequest: Sending vertex " + vertex +
+          " to partition " + partitionId);
+    }
+
+    // Add the message to the cache
+    int partitionMutationCount =
+        sendMutationsCache.addVertexMutation(partitionId, vertex);
+
+    sendMutationsRequestIfFull(
+        partitionId, partitionOwner, partitionMutationCount);
+  }
+
+  @Override
+  public void removeVertexRequest(I vertexIndex) throws IOException {
+    PartitionOwner partitionOwner =
+        serviceWorker.getVertexPartitionOwner(vertexIndex);
+    int partitionId = partitionOwner.getPartitionId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("removeVertexRequest: Removing vertex index " +
+          vertexIndex + " from partition " + partitionId);
+    }
+
+    // Add the message to the cache
+    int partitionMutationCount =
+        sendMutationsCache.removeVertexMutation(partitionId, vertexIndex);
+
+    sendMutationsRequestIfFull(
+        partitionId, partitionOwner, partitionMutationCount);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // Execute the remaining send partitions (if any)
+    for (Map.Entry<PartitionOwner, Partition<I, V, E, M>> entry :
+        sendPartitionCache.getOwnerPartitionMap().entrySet()) {
+      sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue());
+    }
+    sendPartitionCache.clear();
+
+    // Execute the remaining sends messages (if any)
+    PairList<WorkerInfo, PairList<Integer,
+        ByteArrayVertexIdMessages<I, M>>>
+        remainingMessageCache = sendMessageCache.removeAllMessages();
+    PairList<WorkerInfo,
+        PairList<Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator
+        iterator = remainingMessageCache.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      WritableRequest writableRequest =
+          new SendWorkerMessagesRequest<I, V, E, M>(
+              iterator.getCurrentSecond());
+      doRequest(iterator.getCurrentFirst(), writableRequest);
+    }
+
+    // Execute the remaining sends mutations (if any)
+    Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
+        sendMutationsCache.removeAllPartitionMutations();
+    for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
+        remainingMutationsCache.entrySet()) {
+      WritableRequest writableRequest =
+          new SendPartitionMutationsRequest<I, V, E, M>(
+              entry.getKey(), entry.getValue());
+      PartitionOwner partitionOwner =
+          serviceWorker.getVertexPartitionOwner(
+              entry.getValue().keySet().iterator().next());
+      doRequest(partitionOwner.getWorkerInfo(), writableRequest);
+    }
+  }
+
+  @Override
+  public long resetMessageCount() {
+    msgsSentInSuperstep.set(totalMsgsSentInSuperstep);
+    long messagesSentInSuperstep = totalMsgsSentInSuperstep;
+    totalMsgsSentInSuperstep = 0;
+    return messagesSentInSuperstep;
+  }
+
+  @Override
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return workerClient.getVertexPartitionOwner(vertexId);
+  }
+
+  /**
+   * When doing the request, short circuit if it is local
+   *
+   * @param workerInfo Worker info
+   * @param writableRequest Request to either submit or run locally
+   */
+  private void doRequest(WorkerInfo workerInfo,
+                         WritableRequest writableRequest) {
+    // If this is local, execute locally
+    if (serviceWorker.getWorkerInfo().getTaskId() ==
+        workerInfo.getTaskId()) {
+      ((WorkerRequest) writableRequest).doRequest(serverData);
+    } else {
+      workerClient.sendWritableRequest(
+          workerInfo.getTaskId(), writableRequest);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
new file mode 100644
index 0000000..8012397
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.messages.BasicMessageStore;
+import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
+import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.OneMessagePerVertexStore;
+import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Sets;
+
+import java.net.InetSocketAddress;
+import java.util.Set;
+
+/**
+ * Netty worker server that implement {@link WorkerServer} and contains
+ * the actual {@link ServerData}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class NettyWorkerServer<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements WorkerServer<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+    Logger.getLogger(NettyWorkerServer.class);
+  /** Hadoop configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E, M> service;
+  /** Netty server that does that actual I/O */
+  private final NettyServer nettyServer;
+  /** Server data storage */
+  private final ServerData<I, V, E, M> serverData;
+
+  /**
+   * Constructor to start the server.
+   *
+   * @param conf Configuration
+   * @param service Service to get partition mappings
+   * @param context Mapper context
+   */
+  public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+      CentralizedServiceWorker<I, V, E, M> service,
+      Mapper<?, ?, ?, ?>.Context context) {
+    this.conf = conf;
+    this.service = service;
+
+    serverData =
+        new ServerData<I, V, E, M>(conf, createMessageStoreFactory(), context);
+
+    nettyServer = new NettyServer(conf,
+        new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
+        service.getWorkerInfo(), context);
+    nettyServer.start();
+  }
+
+  /**
+   * Decide which message store should be used for current application,
+   * and create the factory for that store
+   *
+   * @return Message store factory
+   */
+  private MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+  createMessageStoreFactory() {
+    boolean useOutOfCoreMessaging = conf.getBoolean(
+        GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+        GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
+    if (!useOutOfCoreMessaging) {
+      if (conf.useCombiner()) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("createMessageStoreFactory: " +
+              "Using OneMessagePerVertexStore since combiner enabled");
+        }
+        return OneMessagePerVertexStore.newFactory(service, conf);
+      } else {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("createMessageStoreFactory: " +
+              "Using ByteArrayMessagesPerVertexStore " +
+              "since there is no combiner");
+        }
+        return ByteArrayMessagesPerVertexStore.newFactory(service, conf);
+      }
+    } else {
+      int maxMessagesInMemory = conf.getInt(
+          GiraphConstants.MAX_MESSAGES_IN_MEMORY,
+          GiraphConstants.MAX_MESSAGES_IN_MEMORY_DEFAULT);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
+            "maxMessagesInMemory = " + maxMessagesInMemory);
+      }
+      MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory =
+          SequentialFileMessageStore.newFactory(conf);
+      MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
+          partitionStoreFactory =
+          DiskBackedMessageStore.newFactory(conf, fileStoreFactory);
+      return DiskBackedMessageStoreByPartition.newFactory(service,
+          maxMessagesInMemory, partitionStoreFactory);
+    }
+  }
+
+  @Override
+  public InetSocketAddress getMyAddress() {
+    return nettyServer.getMyAddress();
+  }
+
+  @Override
+  public void prepareSuperstep(GraphState<I, V, E, M> graphState) {
+    serverData.prepareSuperstep();
+    resolveMutations(graphState);
+  }
+
+  @Override
+  public void resolveMutations(GraphState<I, V, E, M> graphState) {
+    Set<I> resolveVertexIndexSet = Sets.newHashSet();
+    // Keep track of the vertices which are not here but have received messages
+    for (Integer partitionId : service.getPartitionStore().getPartitionIds()) {
+      for (I vertexId : serverData.getCurrentMessageStore().
+          getPartitionDestinationVertices(partitionId)) {
+        Vertex<I, V, E, M> vertex = service.getVertex(vertexId);
+        if (vertex == null) {
+          if (!resolveVertexIndexSet.add(vertexId)) {
+            throw new IllegalStateException(
+                "prepareSuperstep: Already has missing vertex on this " +
+                    "worker for " + vertexId);
+          }
+        }
+      }
+    }
+
+    // Add any mutated vertex indices to be resolved
+    for (I vertexIndex : serverData.getVertexMutations().keySet()) {
+      if (!resolveVertexIndexSet.add(vertexIndex)) {
+        throw new IllegalStateException(
+            "prepareSuperstep: Already has missing vertex on this " +
+                "worker for " + vertexIndex);
+      }
+    }
+
+    // Resolve all graph mutations
+    for (I vertexIndex : resolveVertexIndexSet) {
+      VertexResolver<I, V, E, M> vertexResolver =
+          conf.createVertexResolver(graphState);
+      Vertex<I, V, E, M> originalVertex =
+          service.getVertex(vertexIndex);
+
+      VertexMutations<I, V, E, M> mutations = null;
+      VertexMutations<I, V, E, M> vertexMutations =
+          serverData.getVertexMutations().get(vertexIndex);
+      if (vertexMutations != null) {
+        synchronized (vertexMutations) {
+          mutations = vertexMutations.copy();
+        }
+        serverData.getVertexMutations().remove(vertexIndex);
+      }
+      Vertex<I, V, E, M> vertex = vertexResolver.resolve(
+          vertexIndex, originalVertex, mutations,
+          serverData.getCurrentMessageStore().
+              hasMessagesForVertex(vertexIndex));
+      graphState.getContext().progress();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("prepareSuperstep: Resolved vertex index " +
+            vertexIndex + " with original vertex " +
+            originalVertex + ", returned vertex " + vertex +
+            " on superstep " + service.getSuperstep() +
+            " with mutations " +
+            mutations);
+      }
+
+      Partition<I, V, E, M> partition =
+          service.getPartition(vertexIndex);
+      if (partition == null) {
+        throw new IllegalStateException(
+            "prepareSuperstep: No partition for index " + vertexIndex +
+                " in " + service.getPartitionStore() + " should have been " +
+                service.getVertexPartitionOwner(vertexIndex));
+      }
+      if (vertex != null) {
+        partition.putVertex(vertex);
+      } else if (originalVertex != null) {
+        partition.removeVertex(originalVertex.getId());
+      }
+    }
+
+    if (!serverData.getVertexMutations().isEmpty()) {
+      throw new IllegalStateException("prepareSuperstep: Illegally " +
+          "still has " + serverData.getVertexMutations().size() +
+          " mutations left.");
+    }
+  }
+
+  @Override
+  public ServerData<I, V, E, M> getServerData() {
+    return serverData;
+  }
+
+  @Override
+  public void close() {
+    nettyServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java
new file mode 100644
index 0000000..840fee4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+
+/**
+ * Implements SASL logic for Giraph BSP worker clients.
+ */
+public class SaslNettyClient {
+  /** Class logger */
+  public static final Logger LOG = Logger.getLogger(SaslNettyClient.class);
+
+  /**
+   * Used to synchronize client requests: client's work-related requests must
+   * wait until SASL authentication completes.
+   */
+  private Object authenticated = new Object();
+
+  /**
+   * Used to respond to server's counterpart, SaslServer with SASL tokens
+   * represented as byte arrays.
+   */
+  private SaslClient saslClient;
+
+  /**
+   * Create a SaslNettyClient for authentication with BSP servers.
+   */
+  public SaslNettyClient() {
+    try {
+      Token<? extends TokenIdentifier> token =
+          createJobToken(new Configuration());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SaslNettyClient: Creating SASL " +
+            AuthMethod.DIGEST.getMechanismName() +
+            " client to authenticate to service at " + token.getService());
+      }
+      saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+          .getMechanismName() }, null, null, SaslRpcServer.SASL_DEFAULT_REALM,
+          SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
+    } catch (IOException e) {
+      LOG.error("SaslNettyClient: Could not obtain job token for Netty " +
+          "Client to use to authenticate with a Netty Server.");
+      saslClient = null;
+    }
+  }
+
+  public Object getAuthenticated() {
+    return authenticated;
+  }
+
+  /**
+   * Obtain JobToken, which we'll use as a credential for SASL authentication
+   * when connecting to other Giraph BSPWorkers.
+   *
+   * @param conf Configuration
+   * @return a JobToken containing username and password so that client can
+   * authenticate with a server.
+   */
+  private Token<JobTokenIdentifier> createJobToken(Configuration conf)
+    throws IOException {
+    String localJobTokenFile = System.getenv().get(
+        UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    if (localJobTokenFile != null) {
+      JobConf jobConf = new JobConf(conf);
+      Credentials credentials =
+          TokenCache.loadTokens(localJobTokenFile, jobConf);
+      return TokenCache.getJobToken(credentials);
+    } else {
+      throw new IOException("createJobToken: Cannot obtain authentication " +
+          "credentials for job: file: '" +
+          UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION + "' not found");
+    }
+  }
+
+  /**
+   * Used by authenticateOnChannel() to initiate SASL handshake with server.
+   * @return SaslTokenMessageRequest message to be sent to server.
+   * @throws IOException
+   */
+  public SaslTokenMessageRequest firstToken()
+    throws IOException {
+    byte[] saslToken = new byte[0];
+    if (saslClient.hasInitialResponse()) {
+      saslToken = saslClient.evaluateChallenge(saslToken);
+    }
+    SaslTokenMessageRequest saslTokenMessage =
+        new SaslTokenMessageRequest();
+    saslTokenMessage.setSaslToken(saslToken);
+    return saslTokenMessage;
+  }
+
+  public boolean isComplete() {
+    return saslClient.isComplete();
+  }
+
+  /**
+   * Respond to server's SASL token.
+   * @param saslTokenMessage contains server's SASL token
+   * @return client's response SASL token
+   */
+  public byte[] saslResponse(SaslTokenMessageRequest saslTokenMessage) {
+    try {
+      byte[] retval =
+          saslClient.evaluateChallenge(saslTokenMessage.getSaslToken());
+      return retval;
+    } catch (SaslException e) {
+      LOG.error("saslResponse: Failed to respond to SASL server's token:", e);
+      return null;
+    }
+  }
+
+  /**
+   * Implementation of javax.security.auth.callback.CallbackHandler
+   * that works with Hadoop JobTokens.
+   */
+  private static class SaslClientCallbackHandler implements CallbackHandler {
+    /** Generated username contained in JobToken */
+    private final String userName;
+    /** Generated password contained in JobToken */
+    private final char[] userPassword;
+
+    /**
+     * Set private members using token.
+     * @param token Hadoop JobToken.
+     */
+    public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+      this.userName = SaslNettyServer.encodeIdentifier(token.getIdentifier());
+      this.userPassword = SaslNettyServer.encodePassword(token.getPassword());
+    }
+
+    /**
+     * Implementation used to respond to SASL tokens from server.
+     *
+     * @param callbacks objects that indicate what credential information the
+     *                  server's SaslServer requires from the client.
+     * @throws UnsupportedCallbackException
+     */
+    public void handle(Callback[] callbacks)
+      throws UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "handle: Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("handle: SASL client callback: setting username: " +
+              userName);
+        }
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("handle: SASL client callback: setting userPassword");
+        }
+        pc.setPassword(userPassword);
+      }
+      if (rc != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("handle: SASL client callback: setting realm: " +
+              rc.getDefaultText());
+        }
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
new file mode 100644
index 0000000..2cbf482
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.netty;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.classification.InterfaceStability;
+/*if[HADOOP_1_SECURITY]
+else[HADOOP_1_SECURITY]*/
+import org.apache.hadoop.ipc.StandbyException;
+/*end[HADOOP_1_SECURITY]*/
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.log4j.Logger;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+
+/**
+ * Encapsulates SASL server logic for Giraph BSP worker servers.
+ */
+public class SaslNettyServer extends SaslRpcServer {
+  /** Logger */
+  public static final Logger LOG = Logger.getLogger(SaslNettyServer.class);
+
+  /**
+   * Actual SASL work done by this object from javax.security.sasl.
+   * Initialized below in constructor.
+   */
+  private SaslServer saslServer;
+
+  /**
+   * Constructor
+   *
+   * @param secretManager supplied by SaslServerHandler.
+   */
+  public SaslNettyServer(JobTokenSecretManager secretManager) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SaslNettyServer: Secret manager is: " + secretManager);
+    }
+/*if[HADOOP_1_SECRET_MANAGER]
+else[HADOOP_1_SECRET_MANAGER]*/
+    try {
+      secretManager.checkAvailableForRead();
+    } catch (StandbyException e) {
+      LOG.error("SaslNettyServer: Could not read secret manager: " + e);
+    }
+/*end[HADOOP_1_SECRET_MANAGER]*/
+    try {
+      SaslDigestCallbackHandler ch =
+          new SaslNettyServer.SaslDigestCallbackHandler(secretManager);
+      saslServer = Sasl.createSaslServer(SaslNettyServer.AuthMethod.DIGEST
+          .getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
+          SaslRpcServer.SASL_PROPS, ch);
+    } catch (SaslException e) {
+      LOG.error("SaslNettyServer: Could not create SaslServer: " + e);
+    }
+  }
+
+  public boolean isComplete() {
+    return saslServer.isComplete();
+  }
+
+  public String getUserName() {
+    return saslServer.getAuthorizationID();
+  }
+
+  /**
+   * Used by SaslTokenMessage::processToken() to respond to server SASL tokens.
+   *
+   * @param token Server's SASL token
+   * @return token to send back to the server.
+   */
+  public byte[] response(byte[] token) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("response: Responding to input token of length: " +
+            token.length);
+      }
+      byte[] retval = saslServer.evaluateResponse(token);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("response: Response token length: " + retval.length);
+      }
+      return retval;
+    } catch (SaslException e) {
+      LOG.error("response: Failed to evaluate client token of length: " +
+          token.length + " : " + e);
+      return null;
+    }
+  }
+
+  /**
+   * Encode a byte[] identifier as a Base64-encoded string.
+   *
+   * @param identifier identifier to encode
+   * @return Base64-encoded string
+   */
+  static String encodeIdentifier(byte[] identifier) {
+    return new String(Base64.encodeBase64(identifier));
+  }
+
+  /**
+   * Encode a password as a base64-encoded char[] array.
+   * @param password as a byte array.
+   * @return password as a char array.
+   */
+  static char[] encodePassword(byte[] password) {
+    return new String(Base64.encodeBase64(password)).toCharArray();
+  }
+
+  /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+  @InterfaceStability.Evolving
+  public static class SaslDigestCallbackHandler implements CallbackHandler {
+    /** Used to authenticate the clients */
+    private JobTokenSecretManager secretManager;
+
+    /**
+     * Constructor
+     *
+     * @param secretManager used to authenticate clients
+     */
+    public SaslDigestCallbackHandler(
+        JobTokenSecretManager secretManager) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler " +
+            "with secret manager: " + secretManager);
+      }
+      this.secretManager = secretManager;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void handle(Callback[] callbacks) throws IOException,
+        UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          continue; // realm is ignored
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "handle: Unrecognized SASL DIGEST-MD5 Callback");
+        }
+      }
+      if (pc != null) {
+        JobTokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(),
+            secretManager);
+        char[] password =
+          encodePassword(secretManager.retrievePassword(tokenIdentifier));
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("handle: SASL server DIGEST-MD5 callback: setting " +
+              "password for client: " + tokenIdentifier.getUser());
+        }
+        pc.setPassword(password);
+      }
+      if (ac != null) {
+        String authid = ac.getAuthenticationID();
+        String authzid = ac.getAuthorizationID();
+        if (authid.equals(authzid)) {
+          ac.setAuthorized(true);
+        } else {
+          ac.setAuthorized(false);
+        }
+        if (ac.isAuthorized()) {
+          if (LOG.isDebugEnabled()) {
+            String username =
+              getIdentifier(authzid, secretManager).getUser().getUserName();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("handle: SASL server DIGEST-MD5 callback: setting " +
+                  "canonicalized client ID: " + username);
+            }
+          }
+          ac.setAuthorizedID(authzid);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java
new file mode 100644
index 0000000..be92dde
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor;
+import org.jboss.netty.channel.ReceiveBufferSizePredictor;
+import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
+
+/**
+ * Uses composition to learn more about what
+ * AdaptiveReceiveBufferSizePredictor has determined what the actual
+ * sizes are.
+ */
+public class WrappedAdaptiveReceiveBufferSizePredictorFactory implements
+    ReceiveBufferSizePredictorFactory {
+  /** Internal predictor */
+  private final ReceiveBufferSizePredictor receiveBufferSizePredictor;
+
+  /**
+   * Constructor with defaults.
+   */
+  public WrappedAdaptiveReceiveBufferSizePredictorFactory() {
+    receiveBufferSizePredictor =
+        new WrappedAdaptiveReceiveBufferSizePredictor();
+  }
+
+  /**
+   * Creates a new predictor with the specified parameters.
+   *
+   * @param minimum The inclusive lower bound of the expected buffer size
+   * @param initial The initial buffer size when no feed back was received
+   * @param maximum The inclusive upper bound of the expected buffer size
+   */
+  public WrappedAdaptiveReceiveBufferSizePredictorFactory(int minimum,
+                                                          int initial,
+                                                          int maximum) {
+    receiveBufferSizePredictor = new WrappedAdaptiveReceiveBufferSizePredictor(
+        minimum, initial, maximum);
+  }
+
+  @Override
+  public ReceiveBufferSizePredictor getPredictor() throws Exception {
+    return receiveBufferSizePredictor;
+  }
+
+  /**
+   * Uses composition to expose
+   * details of AdaptiveReceiveBufferSizePredictor.
+   */
+  private static class WrappedAdaptiveReceiveBufferSizePredictor implements
+      ReceiveBufferSizePredictor {
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(
+        WrappedAdaptiveReceiveBufferSizePredictor.class);
+    /** Internally delegated predictor */
+    private final AdaptiveReceiveBufferSizePredictor
+    adaptiveReceiveBufferSizePredictor;
+    /** Number of calls to nextReceiveBufferSize()  */
+    private long nextReceiveBufferSizeCount = 0;
+    /** Number of calls to previousReceiveBufferSize()  */
+    private long previousReceiveBufferSizeCount = 0;
+
+    /**
+     * Creates a new predictor with the default parameters.  With the default
+     * parameters, the expected buffer size starts from {@code 1024}, does not
+     * go down below {@code 64}, and does not go up above {@code 65536}.
+     */
+    public WrappedAdaptiveReceiveBufferSizePredictor() {
+      adaptiveReceiveBufferSizePredictor =
+          new AdaptiveReceiveBufferSizePredictor();
+    }
+
+    /**
+     * Creates a new predictor with the specified parameters.
+     *
+     * @param minimum  the inclusive lower bound of the expected buffer size
+     * @param initial  the initial buffer size when no feed back was received
+     * @param maximum  the inclusive upper bound of the expected buffer size
+     */
+    public WrappedAdaptiveReceiveBufferSizePredictor(int minimum,
+                                                     int initial,
+                                                     int maximum) {
+      adaptiveReceiveBufferSizePredictor =
+          new AdaptiveReceiveBufferSizePredictor(minimum, initial, maximum);
+    }
+
+    @Override
+    public int nextReceiveBufferSize() {
+      int nextReceiveBufferSize =
+          adaptiveReceiveBufferSizePredictor.nextReceiveBufferSize();
+      if (LOG.isDebugEnabled()) {
+        if (nextReceiveBufferSizeCount % 1000 == 0) {
+          LOG.debug("nextReceiveBufferSize: size " +
+              nextReceiveBufferSize +            " " +
+              "count " + nextReceiveBufferSizeCount);
+        }
+        ++nextReceiveBufferSizeCount;
+      }
+      return nextReceiveBufferSize;
+    }
+
+    @Override
+    public void previousReceiveBufferSize(int previousReceiveBufferSize) {
+      if (LOG.isDebugEnabled()) {
+        if (previousReceiveBufferSizeCount % 1000 == 0) {
+          LOG.debug("previousReceiveBufferSize: size " +
+              previousReceiveBufferSize +
+              ", count " + previousReceiveBufferSizeCount);
+        }
+        ++previousReceiveBufferSizeCount;
+      }
+      adaptiveReceiveBufferSizePredictor.previousReceiveBufferSize(
+          previousReceiveBufferSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
new file mode 100644
index 0000000..8ba5b96
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import com.google.common.collect.Maps;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Generate different request ids based on the address of the well known
+ * port on the workers.  Thread-safe.
+ */
+public class AddressRequestIdGenerator {
+  /** Address request generator map */
+  private final ConcurrentMap<InetSocketAddress, AtomicLong>
+  addressRequestGeneratorMap = Maps.newConcurrentMap();
+
+  /**
+   * Get the next request id for a given destination.  Thread-safe.
+   *
+   * @param address Address of the worker (consistent during a superstep)
+   * @return Valid request id
+   */
+  public Long getNextRequestId(InetSocketAddress address) {
+    AtomicLong requestGenerator = addressRequestGeneratorMap.get(address);
+    if (requestGenerator == null) {
+      requestGenerator = new AtomicLong(0);
+      AtomicLong oldRequestGenerator =
+          addressRequestGeneratorMap.putIfAbsent(address, requestGenerator);
+      if (oldRequestGenerator != null) {
+        requestGenerator = oldRequestGenerator;
+      }
+    }
+    return requestGenerator.getAndIncrement();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java
new file mode 100644
index 0000000..274e61c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.SaslNettyServer;
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+/**
+ * Authorize or deny client requests based on existence and completeness
+ * of client's SASL authentication.
+ */
+public class AuthorizeServerHandler extends
+    SimpleChannelUpstreamHandler {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(AuthorizeServerHandler.class);
+
+  /**
+   * Constructor.
+   */
+  public AuthorizeServerHandler() {
+  }
+
+  @Override
+  public void messageReceived(
+      ChannelHandlerContext ctx, MessageEvent e) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("messageReceived: Got " + e.getMessage().getClass());
+    }
+    // Authorize: client is allowed to doRequest() if and only if the client
+    // has successfully authenticated with this server.
+    SaslNettyServer saslNettyServer =
+        NettyServer.CHANNEL_SASL_NETTY_SERVERS.get(ctx.getChannel());
+    if (saslNettyServer == null) {
+      LOG.warn("messageReceived: This client is *NOT* authorized to perform " +
+          "this action since there's no saslNettyServer to " +
+          "authenticate the client: " +
+          "refusing to perform requested action: " + e.getMessage());
+      return;
+    }
+
+    if (!saslNettyServer.isComplete()) {
+      LOG.warn("messageReceived: This client is *NOT* authorized to perform " +
+          "this action because SASL authentication did not complete: " +
+          "refusing to perform requested action: " + e.getMessage());
+      // Return now *WITHOUT* sending upstream here, since client
+      // not authorized.
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("messageReceived: authenticated client: " +
+          saslNettyServer.getUserName() + " is authorized to do request " +
+          "on server.");
+    }
+    // We call sendUpstream() since the client is allowed to perform this
+    // request. The client's request will now proceed to the next
+    // pipeline component, namely, RequestServerHandler.
+    ctx.sendUpstream(e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java
new file mode 100644
index 0000000..0050eed
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+/**
+ * Simple immutable object to use for tracking requests uniquely.  This
+ * object is guaranteed to be unique for a given client (based on the
+ * destination task and the request).
+ */
+public class ClientRequestId {
+  /** Destination task id */
+  private final int destinationTaskId;
+  /** Request id */
+  private final long requestId;
+
+  /**
+   * Constructor.
+   *
+   * @param destinationTaskId Destination task id
+   * @param requestId Request id
+   */
+  public ClientRequestId(int destinationTaskId, long requestId) {
+    this.destinationTaskId = destinationTaskId;
+    this.requestId = requestId;
+  }
+
+  public int getDestinationTaskId() {
+    return destinationTaskId;
+  }
+
+  public long getRequestId() {
+    return requestId;
+  }
+
+  @Override
+  public int hashCode() {
+    return (29 * destinationTaskId) + (int) (57 * requestId);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof ClientRequestId) {
+      ClientRequestId otherObj = (ClientRequestId) other;
+      if (otherObj.getRequestId() == requestId &&
+          otherObj.getDestinationTaskId() == destinationTaskId) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "(destTask=" + destinationTaskId + ",reqId=" + requestId + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
new file mode 100644
index 0000000..b3f0121
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.requests.MasterRequest;
+import org.apache.giraph.graph.MasterAggregatorHandler;
+import org.apache.giraph.graph.TaskInfo;
+
+/** Handler for requests on master */
+public class MasterRequestServerHandler extends
+    RequestServerHandler<MasterRequest> {
+  /** Aggregator handler */
+  private final MasterAggregatorHandler aggregatorHandler;
+
+  /**
+   * Constructor
+   *
+   * @param workerRequestReservedMap Worker request reservation map
+   * @param conf                     Configuration
+   * @param myTaskInfo               Current task info
+   * @param aggregatorHandler        Master aggregator handler
+   */
+  public MasterRequestServerHandler(
+      WorkerRequestReservedMap workerRequestReservedMap,
+      ImmutableClassesGiraphConfiguration conf,
+      TaskInfo myTaskInfo,
+      MasterAggregatorHandler aggregatorHandler) {
+    super(workerRequestReservedMap, conf, myTaskInfo);
+    this.aggregatorHandler = aggregatorHandler;
+  }
+
+  @Override
+  public void processRequest(MasterRequest request) {
+    request.doRequest(aggregatorHandler);
+  }
+
+  /**
+   * Factory for {@link MasterRequestServerHandler}
+   */
+  public static class Factory implements RequestServerHandler.Factory {
+    /** Master aggregator handler */
+    private final MasterAggregatorHandler aggregatorHandler;
+
+    /**
+     * Constructor
+     *
+     * @param aggregatorHandler Master aggregator handler
+     */
+    public Factory(MasterAggregatorHandler aggregatorHandler) {
+      this.aggregatorHandler = aggregatorHandler;
+    }
+
+    @Override
+    public RequestServerHandler newHandler(
+        WorkerRequestReservedMap workerRequestReservedMap,
+        ImmutableClassesGiraphConfiguration conf,
+        TaskInfo myTaskInfo) {
+      return new MasterRequestServerHandler(workerRequestReservedMap, conf,
+          myTaskInfo, aggregatorHandler);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
new file mode 100644
index 0000000..0bf21e5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.netty.ByteCounter;
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+
+/**
+ * Decodes encoded requests from the client.
+ */
+public class RequestDecoder extends OneToOneDecoder {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(RequestDecoder.class);
+  /** Time class to use */
+  private static final Time TIME = SystemTime.get();
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+  /** Byte counter to output */
+  private final ByteCounter byteCounter;
+  /** Start nanoseconds for the decoding time */
+  private long startDecodingNanoseconds = -1;
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   * @param byteCounter Keeps track of the decoded bytes
+   */
+  public RequestDecoder(ImmutableClassesGiraphConfiguration conf,
+                        ByteCounter byteCounter) {
+    this.conf = conf;
+    this.byteCounter = byteCounter;
+  }
+
+  @Override
+  protected Object decode(ChannelHandlerContext ctx,
+      Channel channel, Object msg) throws Exception {
+    if (!(msg instanceof ChannelBuffer)) {
+      throw new IllegalStateException("decode: Got illegal message " + msg);
+    }
+
+    // Output metrics every 1/2 minute
+    String metrics = byteCounter.getMetricsWindow(30000);
+    if (metrics != null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("decode: Server window metrics " + metrics);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      startDecodingNanoseconds = TIME.getNanoseconds();
+    }
+
+    // Decode the request
+    ChannelBuffer buffer = (ChannelBuffer) msg;
+    ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
+    int enumValue = inputStream.readByte();
+    RequestType type = RequestType.values()[enumValue];
+    Class<? extends WritableRequest> writableRequestClass =
+        type.getRequestClass();
+
+    WritableRequest writableRequest =
+        ReflectionUtils.newInstance(writableRequestClass, conf);
+    writableRequest.readFields(inputStream);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("decode: Client " + writableRequest.getClientId() +
+          ", requestId " + writableRequest.getRequestId() +
+          ", " +  writableRequest.getType() + ", with size " +
+          buffer.array().length + " took " +
+          Times.getNanosSince(TIME, startDecodingNanoseconds) + " ns");
+    }
+
+    return writableRequest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
new file mode 100644
index 0000000..eade731
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+/**
+ * Requests have a request type and an encoded request.
+ */
+public class RequestEncoder extends OneToOneEncoder {
+  /** Time class to use */
+  private static final Time TIME = SystemTime.get();
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
+  /** Holds the place of the message length until known */
+  private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
+  /** Buffer starting size */
+  private final int bufferStartingSize;
+  /** Start nanoseconds for the encoding time */
+  private long startEncodingNanoseconds = -1;
+
+  /**
+   * Constructor.
+   *
+   * @param bufferStartingSize Starting size of the buffer
+   */
+  public RequestEncoder(int bufferStartingSize) {
+    this.bufferStartingSize = bufferStartingSize;
+  }
+
+  @Override
+  protected Object encode(ChannelHandlerContext ctx,
+                          Channel channel, Object msg) throws Exception {
+    if (!(msg instanceof WritableRequest)) {
+      throw new IllegalArgumentException(
+          "encode: Got a message of type " + msg.getClass());
+    }
+
+    // Encode the request
+    if (LOG.isDebugEnabled()) {
+      startEncodingNanoseconds = TIME.getNanoseconds();
+    }
+    WritableRequest writableRequest = (WritableRequest) msg;
+    int requestSize = writableRequest.getSerializedSize();
+    ChannelBufferOutputStream outputStream;
+    if (requestSize == WritableRequest.UNKNOWN_SIZE) {
+      outputStream =
+          new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
+              bufferStartingSize,
+              ctx.getChannel().getConfig().getBufferFactory()));
+    } else {
+      requestSize += LENGTH_PLACEHOLDER.length + 1;
+      outputStream = new ChannelBufferOutputStream(
+          ChannelBuffers.directBuffer(requestSize));
+    }
+    outputStream.write(LENGTH_PLACEHOLDER);
+    outputStream.writeByte(writableRequest.getType().ordinal());
+    try {
+      writableRequest.write(outputStream);
+    } catch (IndexOutOfBoundsException e) {
+      LOG.error("encode: Most likely the size of request was not properly " +
+          "specified - see getSerializedSize() in " +
+          writableRequest.getType().getRequestClass());
+      throw new IllegalStateException(e);
+    }
+    outputStream.flush();
+    outputStream.close();
+
+    // Set the correct size at the end
+    ChannelBuffer encodedBuffer = outputStream.buffer();
+    encodedBuffer.setInt(0, encodedBuffer.writerIndex() - 4);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("encode: Client " + writableRequest.getClientId() + ", " +
+          "requestId " + writableRequest.getRequestId() +
+          ", size = " + encodedBuffer.writerIndex() + ", " +
+          writableRequest.getType() + " took " +
+          Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
+    }
+    return encodedBuffer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java
new file mode 100644
index 0000000..97f5beb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.jboss.netty.channel.ChannelFuture;
+
+import java.net.InetSocketAddress;
+import java.util.Date;
+
+/**
+ * Help track requests throughout the system
+ */
+public class RequestInfo {
+  /** Time class to use */
+  private static final Time TIME = SystemTime.get();
+  /** Destination of the request */
+  private final InetSocketAddress destinationAddress;
+  /** When the request was started */
+  private final long startedNanos;
+  /** Request */
+  private final WritableRequest request;
+  /** Future of the write of this request*/
+  private volatile ChannelFuture writeFuture;
+
+  /**
+   * Constructor.
+   *
+   * @param destinationAddress Destination of the request
+   * @param request Request that is sent
+   */
+  public RequestInfo(InetSocketAddress destinationAddress,
+                     WritableRequest request) {
+    this.destinationAddress = destinationAddress;
+    this.request = request;
+    this.startedNanos = TIME.getNanoseconds();
+  }
+
+  public InetSocketAddress getDestinationAddress() {
+    return destinationAddress;
+  }
+
+  /**
+   * Get the started msecs.
+   *
+   * @return Started msecs
+   */
+  public long getStartedMsecs() {
+    return startedNanos / Time.NS_PER_MS;
+  }
+
+  /**
+   * Get the elapsed nanoseconds since the request started.
+   *
+   * @return Nanoseconds since the request was started
+   */
+  public long getElapsedNanos() {
+    return TIME.getNanoseconds() - startedNanos;
+  }
+
+  /**
+   * Get the elapsed millseconds since the request started.
+   *
+   * @return Milliseconds since the request was started
+   */
+  public long getElapsedMsecs() {
+    return getElapsedNanos() / Time.NS_PER_MS;
+  }
+
+
+  public WritableRequest getRequest() {
+    return request;
+  }
+
+  public void setWriteFuture(ChannelFuture writeFuture) {
+    this.writeFuture = writeFuture;
+  }
+
+  public ChannelFuture getWriteFuture() {
+    return writeFuture;
+  }
+
+  @Override
+  public String toString() {
+    return "(reqId=" + request.getRequestId() +
+        ",destAddr=" + destinationAddress.getHostName() + ":" +
+        destinationAddress.getPort() +
+        ",elapsedNanos=" +
+        getElapsedNanos() +
+        ",started=" + new Date(getStartedMsecs()) +
+        ((writeFuture == null) ? ")" :
+            ",writeDone=" + writeFuture.isDone() +
+                ",writeSuccess=" + writeFuture.isSuccess() + ")");
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
new file mode 100644
index 0000000..cd26ea2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+/**
+ * Generic handler of requests.
+ *
+ * @param <R> Request type
+ */
+public abstract class RequestServerHandler<R> extends
+    SimpleChannelUpstreamHandler {
+  /** Number of bytes in the encoded response */
+  public static final int RESPONSE_BYTES = 13;
+  /** Time class to use */
+  private static Time TIME = SystemTime.get();
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(RequestServerHandler.class);
+  /** Already closed first request? */
+  private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
+  /** Close connection on first request (used for simulating failure) */
+  private final boolean closeFirstRequest;
+  /** Request reserved map (for exactly one semantics) */
+  private final WorkerRequestReservedMap workerRequestReservedMap;
+  /** My task info */
+  private final TaskInfo myTaskInfo;
+  /** Start nanoseconds for the processing time */
+  private long startProcessingNanoseconds = -1;
+
+  /**
+   * Constructor
+   *
+   * @param workerRequestReservedMap Worker request reservation map
+   * @param conf Configuration
+   * @param myTaskInfo Current task info
+   */
+  public RequestServerHandler(
+      WorkerRequestReservedMap workerRequestReservedMap,
+      ImmutableClassesGiraphConfiguration conf,
+      TaskInfo myTaskInfo) {
+    this.workerRequestReservedMap = workerRequestReservedMap;
+    closeFirstRequest = conf.getBoolean(
+        GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
+        GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+    this.myTaskInfo = myTaskInfo;
+  }
+
+  @Override
+  public void messageReceived(
+      ChannelHandlerContext ctx, MessageEvent e) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("messageReceived: Got " + e.getMessage().getClass());
+    }
+
+    WritableRequest writableRequest = (WritableRequest) e.getMessage();
+
+    // Simulate a closed connection on the first request (if desired)
+    if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
+      LOG.info("messageReceived: Simulating closing channel on first " +
+          "request " + writableRequest.getRequestId() + " from " +
+          writableRequest.getClientId());
+      ALREADY_CLOSED_FIRST_REQUEST = true;
+      ctx.getChannel().close();
+      return;
+    }
+
+    // Only execute this request exactly once
+    int alreadyDone = 1;
+    if (workerRequestReservedMap.reserveRequest(
+        writableRequest.getClientId(),
+        writableRequest.getRequestId())) {
+      if (LOG.isDebugEnabled()) {
+        startProcessingNanoseconds = TIME.getNanoseconds();
+      }
+      processRequest((R) writableRequest);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("messageReceived: Processing client " +
+            writableRequest.getClientId() + ", " +
+            "requestId " + writableRequest.getRequestId() +
+            ", " +  writableRequest.getType() + " took " +
+            Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
+      }
+      alreadyDone = 0;
+    } else {
+      LOG.info("messageReceived: Request id " +
+          writableRequest.getRequestId() + " from client " +
+          writableRequest.getClientId() +
+          " was already processed, " +
+          "not processing again.");
+    }
+
+    // Send the response with the request id
+    ChannelBuffer buffer = ChannelBuffers.directBuffer(RESPONSE_BYTES);
+    buffer.writeInt(myTaskInfo.getTaskId());
+    buffer.writeLong(writableRequest.getRequestId());
+    buffer.writeByte(alreadyDone);
+    e.getChannel().write(buffer);
+  }
+
+  /**
+   * Process request
+   *
+   * @param request Request to process
+   */
+  public abstract void processRequest(R request);
+
+  @Override
+  public void channelConnected(ChannelHandlerContext ctx,
+                               ChannelStateEvent e) throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("channelConnected: Connected the channel on " +
+          ctx.getChannel().getRemoteAddress());
+    }
+  }
+
+  @Override
+  public void channelClosed(ChannelHandlerContext ctx,
+                            ChannelStateEvent e) throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("channelClosed: Closed the channel on " +
+          ctx.getChannel().getRemoteAddress() + " with event " +
+          e);
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOG.warn("exceptionCaught: Channel failed with " +
+        "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
+  }
+
+  /**
+   * Factory for {@link RequestServerHandler}
+   */
+  public interface Factory {
+    /**
+     * Create new {@link RequestServerHandler}
+     *
+     * @param workerRequestReservedMap Worker request reservation map
+     * @param conf Configuration to use
+     * @param myTaskInfo Current task info
+     * @return New {@link RequestServerHandler}
+     */
+    RequestServerHandler newHandler(
+        WorkerRequestReservedMap workerRequestReservedMap,
+        ImmutableClassesGiraphConfiguration conf,
+        TaskInfo myTaskInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
new file mode 100644
index 0000000..1803be4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Generic handler of responses.
+ */
+public class ResponseClientHandler extends SimpleChannelUpstreamHandler {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(ResponseClientHandler.class);
+  /** Already dropped first response? (used if dropFirstResponse == true) */
+  private static volatile boolean ALREADY_DROPPED_FIRST_RESPONSE = false;
+  /** Drop first response (used for simulating failure) */
+  private final boolean dropFirstResponse;
+  /** Outstanding worker request map */
+  private final ConcurrentMap<ClientRequestId, RequestInfo>
+  workerIdOutstandingRequestMap;
+
+  /**
+   * Constructor.
+   *
+   * @param workerIdOutstandingRequestMap Map of worker ids to outstanding
+   *                                      requests
+   * @param conf Configuration
+   */
+  public ResponseClientHandler(
+      ConcurrentMap<ClientRequestId, RequestInfo>
+          workerIdOutstandingRequestMap,
+      Configuration conf) {
+    this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap;
+    dropFirstResponse = conf.getBoolean(
+        GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
+        GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
+  }
+
+  @Override
+  public void messageReceived(
+      ChannelHandlerContext ctx, MessageEvent event) {
+    if (!(event.getMessage() instanceof ChannelBuffer)) {
+      throw new IllegalStateException("messageReceived: Got a " +
+          "non-ChannelBuffer message " + event.getMessage());
+    }
+
+    ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
+    ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
+    int senderId = -1;
+    long requestId = -1;
+    int response = -1;
+    try {
+      senderId = inputStream.readInt();
+      requestId = inputStream.readLong();
+      response = inputStream.readByte();
+      inputStream.close();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "messageReceived: Got IOException ", e);
+    }
+
+    // Simulate a failed response on the first response (if desired)
+    if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) {
+      LOG.info("messageReceived: Simulating dropped response " + response +
+          " for request " + requestId);
+      ALREADY_DROPPED_FIRST_RESPONSE = true;
+      synchronized (workerIdOutstandingRequestMap) {
+        workerIdOutstandingRequestMap.notifyAll();
+      }
+      return;
+    }
+
+    if (response == 1) {
+      LOG.info("messageReceived: Already completed request " + requestId);
+    } else if (response != 0) {
+      throw new IllegalStateException(
+          "messageReceived: Got illegal response " + response);
+    }
+
+    RequestInfo requestInfo = workerIdOutstandingRequestMap.remove(
+        new ClientRequestId(senderId, requestId));
+    if (requestInfo == null) {
+      LOG.info("messageReceived: Already received response for request id = " +
+          requestId);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("messageReceived: Completed " + requestInfo +
+            ".  Waiting on " + workerIdOutstandingRequestMap.size() +
+            " requests");
+      }
+    }
+
+    // Help NettyClient#waitSomeRequests() to finish faster
+    synchronized (workerIdOutstandingRequestMap) {
+      workerIdOutstandingRequestMap.notifyAll();
+    }
+  }
+
+  @Override
+  public void channelClosed(ChannelHandlerContext ctx,
+                            ChannelStateEvent e) throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("channelClosed: Closed the channel on " +
+          ctx.getChannel().getRemoteAddress());
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOG.warn("exceptionCaught: Channel failed with " +
+        "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
+  }
+}


Mime
View raw message