giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [40/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:33 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java b/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
deleted file mode 100644
index e148c8c..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.netty.handler;
-
-import org.apache.giraph.comm.requests.RequestType;
-import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.log4j.Logger;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-/**
- * How a server should respond to a client. Currently only used for
- * responding to client's SASL messages, and removed after client
- * authenticates.
- */
-public class ResponseEncoder extends OneToOneEncoder {
-  /** Class logger. */
-  private static final Logger LOG = Logger.getLogger(ResponseEncoder.class);
-  /** Holds the place of the message length until known. */
-  private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
-
-  @Override
-  protected Object encode(ChannelHandlerContext ctx,
-      Channel channel, Object msg) throws Exception {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("encode(" + ctx + "," + channel + "," + msg);
-    }
-
-    if (!(msg instanceof WritableRequest)) {
-      throw new IllegalArgumentException(
-          "encode: cannot encode message of type " + msg.getClass() +
-              " since it is not an instance of an implementation of " +
-              " WritableRequest.");
-    }
-    @SuppressWarnings("unchecked")
-    WritableRequest writableRequest =
-      (WritableRequest) msg;
-    ChannelBufferOutputStream outputStream =
-      new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
-        10, ctx.getChannel().getConfig().getBufferFactory()));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("encode: Encoding a message of type " + msg.getClass());
-    }
-    outputStream.write(LENGTH_PLACEHOLDER);
-
-    // write type of object.
-    outputStream.writeByte(writableRequest.getType().ordinal());
-
-    // write the object itself.
-    writableRequest.write(outputStream);
-
-    outputStream.flush();
-
-    // Set the correct size at the end.
-    ChannelBuffer encodedBuffer = outputStream.buffer();
-    encodedBuffer.setInt(0, encodedBuffer.writerIndex() - 4);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("encode: Encoding a message of type " + msg.getClass());
-    }
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-    if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) {
-      // We are sending to the client a SASL_COMPLETE response (created by
-      // the SaslServer handler). The SaslServer handler has removed itself
-      // from the pipeline after creating this response, and now it's time for
-      // the ResponseEncoder to remove itself also.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("encode: Removing RequestEncoder handler: no longer needed," +
-            " since client: " + ctx.getChannel().getRemoteAddress() + " has " +
-            "completed authenticating.");
-      }
-      ctx.getPipeline().remove(this);
-    }
-/*end[HADOOP_NON_SECURE]*/
-    return encodedBuffer;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java b/giraph/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
deleted file mode 100644
index b26a314..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.netty.handler;
-
-import org.apache.giraph.comm.netty.NettyClient;
-import org.apache.giraph.comm.netty.SaslNettyClient;
-import org.apache.giraph.comm.requests.RequestType;
-import org.apache.giraph.comm.requests.SaslCompleteRequest;
-import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
-import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Logger;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
-
-import java.io.IOException;
-
-/**
- * Client-side Netty pipeline component that allows authentication with a
- * server.
- */
-public class SaslClientHandler extends OneToOneDecoder {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(SaslClientHandler.class);
-  /** Configuration */
-  private final Configuration conf;
-
-  /**
-   * Constructor.
-   *
-   * @param conf Configuration
-   */
-  public SaslClientHandler(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public void handleUpstream(
-    ChannelHandlerContext ctx, ChannelEvent evt)
-    throws Exception {
-    if (!(evt instanceof MessageEvent)) {
-      ctx.sendUpstream(evt);
-      return;
-    }
-    MessageEvent e = (MessageEvent) evt;
-    Object originalMessage = e.getMessage();
-    Object decodedMessage = decode(ctx, ctx.getChannel(), originalMessage);
-    // Generate SASL response to server using Channel-local SASL client.
-    SaslNettyClient saslNettyClient = NettyClient.SASL.get(ctx.getChannel());
-    if (saslNettyClient == null) {
-      throw new Exception("handleUpstream: saslNettyClient was unexpectedly " +
-          "null for channel: " + ctx.getChannel());
-    }
-    if (decodedMessage.getClass() == SaslCompleteRequest.class) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("handleUpstream: Server has sent us the SaslComplete " +
-            "message. Allowing normal work to proceed.");
-      }
-      synchronized (saslNettyClient.getAuthenticated()) {
-        saslNettyClient.getAuthenticated().notify();
-      }
-      if (!saslNettyClient.isComplete()) {
-        LOG.error("handleUpstream: Server returned a Sasl-complete message, " +
-            "but as far as we can tell, we are not authenticated yet.");
-        throw new Exception("handleUpstream: Server returned a " +
-            "Sasl-complete message, but as far as " +
-            "we can tell, we are not authenticated yet.");
-      }
-      // Remove SaslClientHandler and replace LengthFieldBasedFrameDecoder
-      // from client pipeline.
-      ctx.getPipeline().remove(this);
-      ctx.getPipeline().replace("length-field-based-frame-decoder",
-          "fixed-length-frame-decoder",
-          new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
-      return;
-    }
-    SaslTokenMessageRequest serverToken =
-      (SaslTokenMessageRequest) decodedMessage;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("handleUpstream: Responding to server's token of length: " +
-          serverToken.getSaslToken().length);
-    }
-    // Generate SASL response (but we only actually send the response if it's
-    // non-null.
-    byte[] responseToServer = saslNettyClient.saslResponse(serverToken);
-    if (responseToServer == null) {
-      // If we generate a null response, then authentication has completed (if
-      // not, warn), and return without sending a response back to the server.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("handleUpstream: Response to server is null: " +
-            "authentication should now be complete.");
-      }
-      if (!saslNettyClient.isComplete()) {
-        LOG.warn("handleUpstream: Generated a null response, " +
-            "but authentication is not complete.");
-      }
-      return;
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("handleUpstream: Response to server token has length:" +
-            responseToServer.length);
-      }
-    }
-    // Construct a message containing the SASL response and send it to the
-    // server.
-    SaslTokenMessageRequest saslResponse =
-      new SaslTokenMessageRequest(responseToServer);
-    ctx.getChannel().write(saslResponse);
-  }
-
-  @Override
-  protected Object decode(ChannelHandlerContext ctx,
-                          Channel channel, Object msg) throws Exception {
-    if (!(msg instanceof ChannelBuffer)) {
-      throw new IllegalStateException("decode: Got illegal message " + msg);
-    }
-    // Decode msg into an object whose class C implements WritableRequest:
-    //  C will be either SaslTokenMessage or SaslComplete.
-    //
-    // 1. Convert message to a stream that can be decoded.
-    ChannelBuffer buffer = (ChannelBuffer) msg;
-    ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
-    // 2. Get first byte: message type:
-    int enumValue = inputStream.readByte();
-    RequestType type = RequestType.values()[enumValue];
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("decode: Got a response of type " + type + " from server:" +
-        channel.getRemoteAddress());
-    }
-    // 3. Create object of the type determined in step 2.
-    Class<? extends WritableRequest> writableRequestClass =
-      type.getRequestClass();
-    WritableRequest serverResponse =
-      ReflectionUtils.newInstance(writableRequestClass, conf);
-    // 4. Deserialize the inputStream's contents into the newly-constructed
-    // serverResponse object.
-    try {
-      serverResponse.readFields(inputStream);
-    } catch (IOException e) {
-      LOG.error("decode: Exception when trying to read server response: " + e);
-    }
-    // serverResponse can now be used in the next stage in pipeline.
-    return serverResponse;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java b/giraph/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
deleted file mode 100644
index d06fd09..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.netty.handler;
-
-import org.apache.giraph.comm.netty.NettyServer;
-import org.apache.giraph.comm.netty.SaslNettyServer;
-import org.apache.giraph.comm.requests.RequestType;
-import org.apache.giraph.comm.requests.SaslCompleteRequest;
-import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
-import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Logger;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * Generate SASL response tokens to client SASL tokens, allowing clients to
- * authenticate themselves with this server.
- */
-public class SaslServerHandler extends
-    SimpleChannelUpstreamHandler {
-    /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SaslServerHandler.class);
-
-  // TODO: Move out into a separate, dedicated handler: ("FirstRequestHandler")
-  // or similar.
-  /** Already closed first request? */
-  private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
-
-  /** Close connection on first request (used for simulating failure) */
-  private final boolean closeFirstRequest;
-  /** Used to store Hadoop Job Tokens to authenticate clients. */
-  private JobTokenSecretManager secretManager;
-
-  /**
-   * Constructor
-   *
-   * @param conf Configuration
-   */
-  public SaslServerHandler(
-      Configuration conf) throws IOException {
-    SaslNettyServer.init(conf);
-    setupSecretManager(conf);
-    closeFirstRequest = conf.getBoolean(
-        GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
-        GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
-  }
-
-  @Override
-  public void messageReceived(
-      ChannelHandlerContext ctx, MessageEvent e) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("messageReceived: Got " + e.getMessage().getClass());
-    }
-
-    WritableRequest writableRequest = (WritableRequest) e.getMessage();
-    // Simulate a closed connection on the first request (if desired)
-    // TODO: Move out into a separate, dedicated handler.
-    if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
-      LOG.info("messageReceived: Simulating closing channel on first " +
-          "request " + writableRequest.getRequestId() + " from " +
-          writableRequest.getClientId());
-      ALREADY_CLOSED_FIRST_REQUEST = true;
-      ctx.getChannel().close();
-      return;
-    }
-
-    if (writableRequest.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
-      // initialize server-side SASL functionality, if we haven't yet
-      // (in which case we are looking at the first SASL message from the
-      // client).
-      SaslNettyServer saslNettyServer =
-          NettyServer.CHANNEL_SASL_NETTY_SERVERS.get(ctx.getChannel());
-      if (saslNettyServer == null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("No saslNettyServer for " + ctx.getChannel() +
-              " yet; creating now, with secret manager: " + secretManager);
-        }
-        saslNettyServer = new SaslNettyServer(secretManager);
-        NettyServer.CHANNEL_SASL_NETTY_SERVERS.set(ctx.getChannel(),
-            saslNettyServer);
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Found existing saslNettyServer on server:" +
-              ctx.getChannel().getLocalAddress() + " for client " +
-              ctx.getChannel().getRemoteAddress());
-        }
-      }
-
-      ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer);
-      // Send response to client.
-      ctx.getChannel().write(writableRequest);
-      if (saslNettyServer.isComplete()) {
-        // If authentication of client is complete, we will also send a
-        // SASL-Complete message to the client.
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("SASL authentication is complete for client with " +
-              "username: " + saslNettyServer.getUserName());
-        }
-        SaslCompleteRequest saslComplete = new SaslCompleteRequest();
-        ctx.getChannel().write(saslComplete);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Removing SaslServerHandler from pipeline since SASL " +
-              "authentication is complete.");
-        }
-        ctx.getPipeline().remove(this);
-      }
-      // do not send upstream to other handlers: no further action needs to be
-      // done for SASL_TOKEN_MESSAGE_REQUEST requests.
-      return;
-    } else {
-      // Client should not be sending other-than-SASL messages before
-      // SaslServerHandler has removed itself from the pipeline. Such non-SASL
-      // requests will be denied by the Authorize channel handler (the next
-      // handler upstream in the server pipeline) if SASL authentication has
-      // not completed.
-      LOG.warn("Sending upstream an unexpected non-SASL message :  " +
-          writableRequest);
-      ctx.sendUpstream(e);
-    }
-  }
-
-  /**
-   * Load Hadoop Job Token into secret manager.
-   *
-   * @param conf Configuration
-   * @throws IOException
-   */
-  private void setupSecretManager(Configuration conf) throws IOException {
-    secretManager = new JobTokenSecretManager();
-    String localJobTokenFile = System.getenv().get(
-        UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-    if (localJobTokenFile == null) {
-      throw new IOException("Could not find job credentials: environment " +
-          "variable: " + UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION +
-          " was not defined.");
-    }
-    JobConf jobConf = new JobConf(conf);
-
-    // Find the JobTokenIdentifiers among all the tokens available in the
-    // jobTokenFile and store them in the secretManager.
-    Credentials credentials =
-        TokenCache.loadTokens(localJobTokenFile, jobConf);
-    Collection<Token<? extends TokenIdentifier>> collection =
-        credentials.getAllTokens();
-    for (Token<? extends TokenIdentifier> token:  collection) {
-      TokenIdentifier tokenIdentifier = decodeIdentifier(token,
-          JobTokenIdentifier.class);
-      if (tokenIdentifier instanceof JobTokenIdentifier) {
-        Token<JobTokenIdentifier> theToken =
-            (Token<JobTokenIdentifier>) token;
-        JobTokenIdentifier jobTokenIdentifier =
-            (JobTokenIdentifier) tokenIdentifier;
-        secretManager.addTokenForJob(
-            jobTokenIdentifier.getJobId().toString(), theToken);
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("loaded JobToken credentials: " + credentials + " from " +
-          "localJobTokenFile: " + localJobTokenFile);
-    }
-  }
-
-  /**
-   * Get the token identifier object, or null if it could not be constructed
-   * (because the class could not be loaded, for example).
-   * Hadoop 2.0.0 (and older Hadoop2 versions? (verify)) need this.
-   * Hadoop 2.0.1 and newer have a Token.decodeIdentifier() method and do not
-   * need this. Might want to create a munge flag to distinguish 2.0.0 vs newer.
-   *
-   * @param token the token to decode into a TokenIdentifier
-   * @param cls the subclass of TokenIdentifier to decode the token into.
-   * @return the token identifier.
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  private TokenIdentifier decodeIdentifier(
-      Token<? extends TokenIdentifier> token,
-      Class<? extends TokenIdentifier> cls) throws IOException {
-    TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
-    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
-    DataInputStream in = new DataInputStream(buf);
-    tokenIdentifier.readFields(in);
-    in.close();
-    return tokenIdentifier;
-  }
-
-  /** Factory for {@link SaslServerHandler} */
-  public static class Factory {
-    /**
-     * Constructor
-     */
-    public Factory() {
-    }
-    /**
-     * Create new {@link SaslServerHandler}
-     *
-     * @param conf Configuration to use
-     * @return New {@link SaslServerHandler}
-     */
-    public SaslServerHandler newHandler(
-        Configuration conf) throws IOException {
-      return new SaslServerHandler(conf);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java b/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java
deleted file mode 100644
index cf45292..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.netty.handler;
-
-import org.apache.giraph.comm.netty.NettyServer;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.utils.IncreasingBitSet;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.MapMaker;
-
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Provides a thread-safe map for checking worker and request id pairs
- */
-public class WorkerRequestReservedMap {
-  /** Map of the worker ids to the requests received (bit set) */
-  private final ConcurrentMap<Integer, IncreasingBitSet>
-  workerRequestReservedMap;
-
-  /**
-   * Constructor
-   *
-   * @param conf Configuration
-   */
-  public WorkerRequestReservedMap(Configuration conf) {
-    workerRequestReservedMap = new MapMaker().concurrencyLevel(
-        conf.getInt(GiraphConstants.MSG_NUM_FLUSH_THREADS,
-            NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT)).makeMap();
-  }
-
-  /**
-   * Reserve the request (before the request starts to insure that it is
-   * only executed once).  We are assuming no failure on the server.
-   *
-   * @param workerId workerId of the request
-   * @param requestId Request id
-   * @return True if the reserving succeeded, false otherwise
-   */
-  public boolean reserveRequest(Integer workerId, long requestId) {
-    IncreasingBitSet requestSet = getRequestSet(workerId);
-    return requestSet.add(requestId);
-  }
-
-  /**
-   * Get and create the entry as necessary to get the request bit set.
-   *
-   * @param workerId Id of the worker to get the bit set for
-   * @return Bit set for the worker
-   */
-  private IncreasingBitSet getRequestSet(Integer workerId) {
-    IncreasingBitSet requestSet = workerRequestReservedMap.get(workerId);
-    if (requestSet == null) {
-      requestSet = new IncreasingBitSet();
-      IncreasingBitSet previous =
-          workerRequestReservedMap.putIfAbsent(workerId, requestSet);
-      if (previous != null) {
-        requestSet = previous;
-      }
-    }
-    return requestSet;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java b/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
deleted file mode 100644
index b4e7dda..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.netty.handler;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.requests.WorkerRequest;
-import org.apache.giraph.graph.TaskInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Handler for requests on worker
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class WorkerRequestServerHandler<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    RequestServerHandler<WorkerRequest<I, V, E, M>> {
-  /** Data that can be accessed for handling requests */
-  private final ServerData<I, V, E, M> serverData;
-
-  /**
-   * Constructor with external server data
-   *
-   * @param serverData               Data held by the server
-   * @param workerRequestReservedMap Worker request reservation map
-   * @param conf                     Configuration
-   * @param myTaskInfo               Current task info
-   */
-  public WorkerRequestServerHandler(ServerData<I, V, E, M> serverData,
-      WorkerRequestReservedMap workerRequestReservedMap,
-      ImmutableClassesGiraphConfiguration conf,
-      TaskInfo myTaskInfo) {
-    super(workerRequestReservedMap, conf, myTaskInfo);
-    this.serverData = serverData;
-  }
-
-  @Override
-  public void processRequest(WorkerRequest<I, V, E, M> request) {
-    request.doRequest(serverData);
-  }
-
-  /** Factory for {@link WorkerRequestServerHandler} */
-  public static class Factory<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable> implements
-      RequestServerHandler.Factory {
-    /** Data that can be accessed for handling requests */
-    private final ServerData<I, V, E, M> serverData;
-
-    /**
-     * Constructor
-     *
-     * @param serverData Data held by the server
-     */
-    public Factory(ServerData<I, V, E, M> serverData) {
-      this.serverData = serverData;
-    }
-
-    @Override
-    public RequestServerHandler newHandler(
-        WorkerRequestReservedMap workerRequestReservedMap,
-        ImmutableClassesGiraphConfiguration conf,
-        TaskInfo myTaskInfo) {
-      return new WorkerRequestServerHandler<I, V, E,
-          M>(serverData, workerRequestReservedMap, conf, myTaskInfo);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/netty/handler/package-info.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/package-info.java b/giraph/src/main/java/org/apache/giraph/comm/netty/handler/package-info.java
deleted file mode 100644
index e99c29e..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/netty/handler/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package for classes which handle requests and responses.
- */
-package org.apache.giraph.comm.netty.handler;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/netty/package-info.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/netty/package-info.java b/giraph/src/main/java/org/apache/giraph/comm/netty/package-info.java
deleted file mode 100644
index 2430366..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/netty/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package for netty implementations.
- */
-package org.apache.giraph.comm.netty;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/package-info.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/package-info.java b/giraph/src/main/java/org/apache/giraph/comm/package-info.java
deleted file mode 100644
index 616b8b5..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of communication related objects, IPC service.
- */
-package org.apache.giraph.comm;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
deleted file mode 100644
index 7107228..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Abstract request which has a byte array as its data
- */
-public abstract class ByteArrayRequest extends WritableRequest {
-  /** Request data */
-  private byte[] data;
-
-  /**
-   * Constructor
-   *
-   * @param data Request data
-   */
-  ByteArrayRequest(byte[] data) {
-    this.data = data;
-  }
-
-  /**
-   * Constructor used for reflection only
-   */
-  ByteArrayRequest() {
-  }
-
-  /**
-   * Get request data
-   *
-   * @return Request data
-   */
-  public byte[] getData() {
-    return data;
-  }
-
-  /**
-   * Get request data in the form of {@link DataInput}
-   *
-   * @return Request data as {@link DataInput}
-   */
-  public DataInput getDataInput() {
-    return new DataInputStream(new ByteArrayInputStream(data));
-  }
-
-  @Override
-  void readFieldsRequest(DataInput input) throws IOException {
-    int dataLength = input.readInt();
-    data = new byte[dataLength];
-    input.readFully(data);
-  }
-
-  @Override
-  void writeRequest(DataOutput output) throws IOException {
-    output.writeInt(data.length);
-    output.write(data);
-  }
-
-  @Override
-  public int getSerializedSize() {
-    // 4 for the length of data, plus number of data bytes
-    return super.getSerializedSize() + 4 + data.length;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
deleted file mode 100644
index 18a23ea..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.graph.MasterAggregatorHandler;
-
-/**
- * Interface for requests sent to master to extend
- */
-public interface MasterRequest {
-  /**
-   * Execute the request
-   *
-   * @param aggregatorHandler Master aggregator handler
-   */
-  void doRequest(MasterAggregatorHandler aggregatorHandler);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java
deleted file mode 100644
index aac0028..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-/**
- * Type of the request
- */
-public enum RequestType {
-  /*if[HADOOP_NON_SECURE]
-  else[HADOOP_NON_SECURE]*/
-  /** Exchange authentication information between clients and servers */
-  SASL_TOKEN_MESSAGE_REQUEST(SaslTokenMessageRequest.class),
-  /**
-   * Used by servers to acknowledge SASL authentication completion with
-   * client, so client can modify its pipeline afterwards.
-   */
-  SASL_COMPLETE_REQUEST(SaslCompleteRequest.class),
-  /*end[HADOOP_NON_SECURE]*/
-  /** Sending vertices request */
-  SEND_VERTEX_REQUEST(SendVertexRequest.class),
-  /** Sending a partition of messages for next superstep */
-  SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class),
-  /**
-   * Sending a partition of messages for current superstep
-   * (used during partition exchange)
-   */
-  SEND_PARTITION_CURRENT_MESSAGES_REQUEST
-      (SendPartitionCurrentMessagesRequest.class),
-  /** Send a partition of mutations */
-  SEND_PARTITION_MUTATIONS_REQUEST(SendPartitionMutationsRequest.class),
-  /** Send aggregated values from one worker's vertices */
-  SEND_WORKER_AGGREGATORS_REQUEST(SendWorkerAggregatorsRequest.class),
-  /** Send aggregated values from worker owner to master */
-  SEND_AGGREGATORS_TO_MASTER_REQUEST(SendAggregatorsToMasterRequest.class),
-  /** Send aggregators from master to worker owners */
-  SEND_AGGREGATORS_TO_OWNER_REQUEST(SendAggregatorsToOwnerRequest.class),
-  /** Send aggregators from worker owner to other workers */
-  SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class);
-
-  /** Class of request which this type corresponds to */
-  private final Class<? extends WritableRequest> requestClass;
-
-  /**
-   * Constructor
-   *
-   * @param requestClass Class of request which this type corresponds to
-   */
-  private RequestType(Class<? extends WritableRequest> requestClass) {
-    this.requestClass = requestClass;
-  }
-
-  /**
-   * Get class of request which this type corresponds to
-   *
-   * @return Class of request which this type corresponds to
-   */
-  public Class<? extends WritableRequest> getRequestClass() {
-    return requestClass;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java
deleted file mode 100644
index 938d59f..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Reply from server to client after SASL authentication completes.
- */
-public class SaslCompleteRequest extends WritableRequest {
-  /**
-   * Constructor used for reflection and sending.
-   */
-  public SaslCompleteRequest() {
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SASL_COMPLETE_REQUEST;
-  }
-
-  @Override
-  public void readFieldsRequest(DataInput input) throws IOException {
-  }
-
-  @Override
-  public void writeRequest(DataOutput output) throws IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
deleted file mode 100644
index d939b6b..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.comm.netty.SaslNettyServer;
-import org.apache.log4j.Logger;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Send and receive SASL tokens.
- */
-public class SaslTokenMessageRequest extends WritableRequest {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SaslTokenMessageRequest.class);
-
-  /** Used for client or server's token to send or receive from each other. */
-  private byte[] token;
-
-  /**
-   * Constructor used for reflection only.
-   */
-  public SaslTokenMessageRequest() { }
-
- /**
-   * Constructor used to send request.
-   *
-   * @param token the SASL token, generated by a SaslClient or SaslServer.
-   */
-  public SaslTokenMessageRequest(byte[] token) {
-    this.token = token;
-  }
-
-  /**
-   * Read accessor for SASL token
-   *
-   * @return saslToken SASL token
-   */
-  public byte[] getSaslToken() {
-    return token;
-  }
-
-  /**
-   * Write accessor for SASL token
-   *
-   * @param token SASL token
-   */
-  public void setSaslToken(byte[] token) {
-    this.token = token;
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SASL_TOKEN_MESSAGE_REQUEST;
-  }
-
-  @Override
-  public void readFieldsRequest(DataInput input) throws IOException {
-    int tokenSize = input.readInt();
-    token = new byte[tokenSize];
-    input.readFully(token);
-  }
-
-  /**
-   * Update server's token in response to the SASL token received from
-   * client. Updated token is sent to client by
-   * SaslServerHandler.messageReceived().
-   *
-   * @param saslNettyServer used to create response.
-   */
-
-  public void processToken(SaslNettyServer saslNettyServer) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("processToken:  With nettyServer: " + saslNettyServer +
-          " and token length: " + token.length);
-    }
-    token = saslNettyServer.response(token);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("processToken: Response token's length is:" + token.length);
-    }
-  }
-
-  @Override
-  public void writeRequest(DataOutput output) throws IOException {
-    output.writeInt(token.length);
-    output.write(token);
-  }
-
-  @Override
-  public int getSerializedSize() {
-    return super.getSerializedSize() + 4 + token.length;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
deleted file mode 100644
index 104e507..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.graph.MasterAggregatorHandler;
-
-import java.io.IOException;
-
-/**
- * Request to send final aggregated values from worker which owns
- * aggregators to the master
- */
-public class SendAggregatorsToMasterRequest extends ByteArrayRequest
-    implements MasterRequest {
-
-  /**
-   * Constructor
-   *
-   * @param data Serialized aggregator data
-   */
-  public SendAggregatorsToMasterRequest(byte[] data) {
-    super(data);
-  }
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendAggregatorsToMasterRequest() {
-  }
-
-  @Override
-  public void doRequest(MasterAggregatorHandler aggregatorHandler) {
-    try {
-      aggregatorHandler.acceptAggregatedValues(getDataInput());
-    } catch (IOException e) {
-      throw new IllegalStateException("doRequest: " +
-          "IOException occurred while processing request", e);
-    }
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
deleted file mode 100644
index b102cfe..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.graph.Aggregator;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * Request to send final aggregatd values from master to worker which owns
- * the aggregators
- */
-public class SendAggregatorsToOwnerRequest extends ByteArrayRequest
-    implements WorkerRequest {
-
-  /**
-   * Constructor
-   *
-   * @param data Serialized aggregator data
-   */
-  public SendAggregatorsToOwnerRequest(byte[] data) {
-    super(data);
-  }
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendAggregatorsToOwnerRequest() {
-  }
-
-  @Override
-  public void doRequest(ServerData serverData) {
-    DataInput input = getDataInput();
-    AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
-    try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        String aggregatorClassName = input.readUTF();
-        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromMaster(count.get());
-        } else {
-          Class<Aggregator<Writable>> aggregatorClass =
-              AggregatorUtils.getAggregatorClass(aggregatorClassName);
-          aggregatorData.registerAggregatorClass(aggregatorName,
-              aggregatorClass);
-          Writable aggregatorValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatorValue.readFields(input);
-          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
-          serverData.getOwnerAggregatorData().registerAggregator(
-              aggregatorName, aggregatorClass);
-        }
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException("doRequest: " +
-          "IOException occurred while processing request", e);
-    }
-    aggregatorData.receivedRequestFromMaster(getData());
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
deleted file mode 100644
index 0efd18d..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.graph.Aggregator;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * Request to send final aggregated values from worker which owns them to
- * other workers
- */
-public class SendAggregatorsToWorkerRequest extends
-    ByteArrayRequest implements WorkerRequest {
-
-  /**
-   * Constructor
-   *
-   * @param data Serialized aggregator data
-   */
-  public SendAggregatorsToWorkerRequest(byte[] data) {
-    super(data);
-  }
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendAggregatorsToWorkerRequest() {
-  }
-
-  @Override
-  public void doRequest(ServerData serverData) {
-    DataInput input = getDataInput();
-    AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
-    try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        String aggregatorClassName = input.readUTF();
-        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromWorker(count.get());
-        } else {
-          Class<Aggregator<Writable>> aggregatorClass =
-              AggregatorUtils.getAggregatorClass(aggregatorClassName);
-          aggregatorData.registerAggregatorClass(aggregatorName,
-              aggregatorClass);
-          Writable aggregatorValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatorValue.readFields(input);
-          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
-        }
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException("doRequest: " +
-          "IOException occurred while processing request", e);
-    }
-    aggregatorData.receivedRequestFromWorker();
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
deleted file mode 100644
index 037f4a0..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Send a collection of vertex messages for a partition. It adds messages to
- * current message store and it should be used only during partition exchange.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
-  V extends Writable, E extends Writable, M extends Writable> extends
-  WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
-  /** Destination partition for these vertices' messages*/
-  private int partitionId;
-  /** Map of destination vertex ID's to message lists */
-  private ByteArrayVertexIdMessages<I, M> vertexIdMessageMap;
-
-  /** Constructor used for reflection only */
-  public SendPartitionCurrentMessagesRequest() { }
-
-  /**
-   * Constructor used to send request.
-   *
-   * @param partitionId Partition to send the request to
-   * @param vertexIdMessages Map of messages to send
-   */
-  public SendPartitionCurrentMessagesRequest(int partitionId,
-    ByteArrayVertexIdMessages<I, M> vertexIdMessages) {
-    super();
-    this.partitionId = partitionId;
-    this.vertexIdMessageMap = vertexIdMessages;
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_PARTITION_CURRENT_MESSAGES_REQUEST;
-  }
-
-  @Override
-  public void readFieldsRequest(DataInput input) throws IOException {
-    partitionId = input.readInt();
-    vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>();
-    vertexIdMessageMap.setConf(getConf());
-    vertexIdMessageMap.initialize();
-    vertexIdMessageMap.readFields(input);
-  }
-
-  @Override
-  public void writeRequest(DataOutput output) throws IOException {
-    output.writeInt(partitionId);
-    vertexIdMessageMap.write(output);
-  }
-
-  @Override
-  public void doRequest(ServerData<I, V, E, M> serverData) {
-    try {
-      serverData.getCurrentMessageStore().addPartitionMessages(partitionId,
-          vertexIdMessageMap);
-    } catch (IOException e) {
-      throw new RuntimeException("doRequest: Got IOException ", e);
-    }
-  }
-
-  @Override
-  public int getSerializedSize() {
-    return super.getSerializedSize() + 4 +
-        vertexIdMessageMap.getSerializedSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
deleted file mode 100644
index 22e4944..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.VertexMutations;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Send a collection of vertex mutations for a partition.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class SendPartitionMutationsRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SendPartitionMutationsRequest.class);
-  /** Partition id */
-  private int partitionId;
-  /** Mutations sent for a partition */
-  private Map<I, VertexMutations<I, V, E, M>> vertexIdMutations;
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendPartitionMutationsRequest() { }
-
-  /**
-   * Constructor used to send request.
-   *
-   * @param partitionId Partition to send the request to
-   * @param vertexIdMutations Map of mutations to send
-   */
-  public SendPartitionMutationsRequest(
-      int partitionId,
-      Map<I, VertexMutations<I, V, E, M>> vertexIdMutations) {
-    this.partitionId = partitionId;
-    this.vertexIdMutations = vertexIdMutations;
-  }
-
-  @Override
-  public void readFieldsRequest(DataInput input) throws IOException {
-    partitionId = input.readInt();
-    int vertexIdMutationsSize = input.readInt();
-    vertexIdMutations = Maps.newHashMapWithExpectedSize(vertexIdMutationsSize);
-    for (int i = 0; i < vertexIdMutationsSize; ++i) {
-      I vertexId = getConf().createVertexId();
-      vertexId.readFields(input);
-      VertexMutations<I, V, E, M> vertexMutations =
-          new VertexMutations<I, V, E, M>();
-      vertexMutations.setConf(getConf());
-      vertexMutations.readFields(input);
-      if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
-        throw new IllegalStateException(
-            "readFields: Already has vertex id " + vertexId);
-      }
-    }
-  }
-
-  @Override
-  public void writeRequest(DataOutput output) throws IOException {
-    output.writeInt(partitionId);
-    output.writeInt(vertexIdMutations.size());
-    for (Entry<I, VertexMutations<I, V, E, M>> entry :
-        vertexIdMutations.entrySet()) {
-      entry.getKey().write(output);
-      entry.getValue().write(output);
-    }
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_PARTITION_MUTATIONS_REQUEST;
-  }
-
-  @Override
-  public void doRequest(ServerData<I, V, E, M> serverData) {
-    ConcurrentHashMap<I, VertexMutations<I, V, E, M>> vertexMutations =
-      serverData.getVertexMutations();
-    for (Entry<I, VertexMutations<I, V, E, M>> entry :
-        vertexIdMutations.entrySet()) {
-      VertexMutations<I, V, E, M> mutations =
-          vertexMutations.get(entry.getKey());
-      if (mutations == null) {
-        mutations = vertexMutations.putIfAbsent(
-            entry.getKey(), entry.getValue());
-        if (mutations == null) {
-          continue;
-        }
-      }
-      synchronized (mutations) {
-        mutations.addVertexMutations(entry.getValue());
-      }
-    }
-  }
-
-  @Override
-  public int getSerializedSize() {
-    return WritableRequest.UNKNOWN_SIZE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
deleted file mode 100644
index 4bc9daa..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.partition.Partition;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Send a collection of vertices for a partition.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class SendVertexRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SendVertexRequest.class);
-  /** Partition */
-  private Partition<I, V, E, M> partition;
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendVertexRequest() { }
-
-  /**
-   * Constructor for sending a request.
-   *
-   * @param partition Partition to send the request to
-   */
-  public SendVertexRequest(Partition<I, V, E, M> partition) {
-    this.partition = partition;
-  }
-
-  @Override
-  public void readFieldsRequest(DataInput input) throws IOException {
-    partition = getConf().createPartition(-1, null);
-    partition.readFields(input);
-  }
-
-  @Override
-  public void writeRequest(DataOutput output) throws IOException {
-    partition.write(output);
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_VERTEX_REQUEST;
-  }
-
-  @Override
-  public void doRequest(ServerData<I, V, E, M> serverData) {
-    serverData.getPartitionStore().addPartition(partition);
-  }
-
-  @Override
-  public int getSerializedSize() {
-    return WritableRequest.UNKNOWN_SIZE;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
deleted file mode 100644
index 264f03a..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * Request to send partial aggregated values for current superstep (values
- * which were computed by one worker's vertices)
- */
-public class SendWorkerAggregatorsRequest extends
-    ByteArrayRequest implements WorkerRequest {
-
-  /**
-   * Constructor
-   *
-   * @param data Serialized aggregator data
-   */
-  public SendWorkerAggregatorsRequest(byte[] data) {
-    super(data);
-  }
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendWorkerAggregatorsRequest() {
-  }
-
-  @Override
-  public void doRequest(ServerData serverData) {
-    DataInput input = getDataInput();
-    OwnerAggregatorServerData aggregatorData =
-        serverData.getOwnerAggregatorData();
-    try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        if (aggregatorName.equals(
-            AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromWorker(count.get());
-        } else {
-          Writable aggregatedValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatedValue.readFields(input);
-          aggregatorData.aggregate(aggregatorName, aggregatedValue);
-        }
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException("doRequest: " +
-          "IOException occurred while processing request", e);
-    }
-    aggregatorData.receivedRequestFromWorker();
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_WORKER_AGGREGATORS_REQUEST;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
deleted file mode 100644
index 641c795..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.PairList;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Send a collection of vertex messages for a partition.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class SendWorkerMessagesRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SendWorkerMessagesRequest.class);
-  /**
-   * All messages for a group of vertices, organized by partition, which
-   * are owned by a single (destination) worker. These messages are all
-   * destined for this worker.
-   * */
-  private PairList<Integer, ByteArrayVertexIdMessages<I, M>>
-  partitionVertexMessages;
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendWorkerMessagesRequest() { }
-
-  /**
-   * Constructor used to send request.
-   *
-   * @param partVertMsgs Map of remote partitions =>
-   *                     ByteArrayVertexIdMessages
-   */
-  public SendWorkerMessagesRequest(
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
-    super();
-    this.partitionVertexMessages = partVertMsgs;
-  }
-
-  @Override
-  public void readFieldsRequest(DataInput input) throws IOException {
-    int numPartitions = input.readInt();
-    partitionVertexMessages =
-        new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
-    partitionVertexMessages.initialize(numPartitions);
-    while (numPartitions-- > 0) {
-      final int partitionId = input.readInt();
-      ByteArrayVertexIdMessages<I, M> vertexIdMessages =
-          new ByteArrayVertexIdMessages<I, M>();
-      vertexIdMessages.setConf(getConf());
-      vertexIdMessages.readFields(input);
-      partitionVertexMessages.add(partitionId, vertexIdMessages);
-    }
-  }
-
-  @Override
-  public void writeRequest(DataOutput output) throws IOException {
-    output.writeInt(partitionVertexMessages.getSize());
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
-        iterator = partitionVertexMessages.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      output.writeInt(iterator.getCurrentFirst());
-      iterator.getCurrentSecond().write(output);
-    }
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_WORKER_MESSAGES_REQUEST;
-  }
-
-  @Override
-  public void doRequest(ServerData<I, V, E, M> serverData) {
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
-        iterator = partitionVertexMessages.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      try {
-        serverData.getIncomingMessageStore().
-            addPartitionMessages(iterator.getCurrentFirst(),
-                iterator.getCurrentSecond());
-      } catch (IOException e) {
-        throw new RuntimeException("doRequest: Got IOException ", e);
-      }
-    }
-  }
-
-  @Override
-  public int getSerializedSize() {
-    int size = super.getSerializedSize() + 4;
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
-        iterator = partitionVertexMessages.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      size += 4 + iterator.getCurrentSecond().getSerializedSize();
-    }
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
deleted file mode 100644
index 4d9382f..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.comm.ServerData;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Interface for requests sent to worker to extend
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public interface WorkerRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /**
-   * Execute the request
-   *
-   * @param serverData Accessible data that can be mutated per the request
-   */
-  void doRequest(ServerData<I, V, E, M> serverData);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java b/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
deleted file mode 100644
index fad20b0..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Interface for requests to implement
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public abstract class WritableRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements Writable,
-    ImmutableClassesGiraphConfigurable<I, V, E, M> {
-  /**
-   * Value to use when size of the request in serialized form is not known
-   * or too expensive to calculate
-   */
-  public static final int UNKNOWN_SIZE = -1;
-
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-  /** Client id */
-  private int clientId = -1;
-  /** Request id */
-  private long requestId = -1;
-
-  public int getClientId() {
-    return clientId;
-  }
-
-  public void setClientId(int clientId) {
-    this.clientId = clientId;
-  }
-
-  public long getRequestId() {
-    return requestId;
-  }
-
-  public void setRequestId(long requestId) {
-    this.requestId = requestId;
-  }
-
-  /**
-   * Get the size of the request in serialized form. The number returned by
-   * this function can't be less than the actual size - if the size can't be
-   * calculated correctly return WritableRequest.UNKNOWN_SIZE.
-   *
-   * @return The size (in bytes) of serialized request,
-   * or WritableRequest.UNKNOWN_SIZE if the size is not known
-   * or too expensive to calculate.
-   */
-  public int getSerializedSize() {
-    // 4 for clientId, 8 for requestId
-    return 4 + 8;
-  }
-
-  /**
-   * Get the type of the request
-   *
-   * @return Request type
-   */
-  public abstract RequestType getType();
-
-  /**
-   * Serialize the request
-   *
-   * @param input Input to read fields from
-   */
-  abstract void readFieldsRequest(DataInput input) throws IOException;
-
-  /**
-   * Deserialize the request
-   *
-   * @param output Output to write the request to
-   */
-  abstract void writeRequest(DataOutput output) throws IOException;
-
-  @Override
-  public final ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
-    return conf;
-  }
-
-  @Override
-  public final void setConf(ImmutableClassesGiraphConfiguration<I, V,
-      E, M> conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public final void readFields(DataInput input) throws IOException {
-    clientId = input.readInt();
-    requestId = input.readLong();
-    readFieldsRequest(input);
-  }
-
-  @Override
-  public final void write(DataOutput output) throws IOException {
-    output.writeInt(clientId);
-    output.writeLong(requestId);
-    writeRequest(output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/requests/package-info.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/requests/package-info.java b/giraph/src/main/java/org/apache/giraph/comm/requests/package-info.java
deleted file mode 100644
index d61bf1a..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/requests/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package for requests.
- */
-package org.apache.giraph.comm.requests;


Mime
View raw message