hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bs...@apache.org
Subject svn commit: r1628344 [2/2] - in /hama/trunk/core: ./ src/main/java/org/apache/hama/bsp/message/ src/main/java/org/apache/hama/ipc/ src/test/java/org/apache/hama/bsp/message/ src/test/java/org/apache/hama/ipc/
Date Tue, 30 Sep 2014 00:44:15 GMT
Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java Tue Sep 30 00:44:14
2014
@@ -0,0 +1,671 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * An abstract IPC service using netty. IPC calls take a single {@link Writable}
+ * as a parameter, and return a {@link Writable}*
+ * 
+ * @see AsyncServer
+ */
+public abstract class AsyncServer {
+
+  private AuthMethod authMethod;
+  static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+  static int INITIAL_RESP_BUF_SIZE = 1024;
+  UserGroupInformation user = null;
+  // 1 : Introduce ping and server does not throw away RPCs
+  // 3 : Introduce the protocol into the RPC connection header
+  // 4 : Introduced SASL security layer
+  static final byte CURRENT_VERSION = 4;
+  static final int HEADER_LENGTH = 10;
+  // follows version is read.
+  private Configuration conf;
+  private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+  private int backlogLength;;
+  InetSocketAddress address;
+  private static final Log LOG = LogFactory.getLog(AsyncServer.class);
+  private static int NIO_BUFFER_LIMIT = 8 * 1024;
+  private final int maxRespSize;
+  static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size";
+  static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024 * 1024;
+
+  private static final ThreadLocal<AsyncServer> SERVER = new ThreadLocal<AsyncServer>();
+  private int port; // port we listen on
+  private Class<? extends Writable> paramClass; // class of call parameters
+  // Configure the server.(constructor is thread num)
+  private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+  private EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String,
Class<?>>();
+  private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
+
+  static Class<?> getProtocolClass(String protocolName, Configuration conf)
+      throws ClassNotFoundException {
+    Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
+    if (protocol == null) {
+      protocol = conf.getClassByName(protocolName);
+      PROTOCOL_CACHE.put(protocolName, protocol);
+    }
+    return protocol;
+  }
+
+  /**
+   * Getting address
+   * 
+   * @return InetSocketAddress
+   */
+  public InetSocketAddress getAddress() {
+    return address;
+  }
+
+  /**
+   * Returns the server instance called under or null. May be called under
+   * {@link #call(Writable, long)} implementations, and under {@link Writable}
+   * methods of paramters and return values. Permits applications to access the
+   * server context.
+   * 
+   * @return NioServer
+   */
+  public static AsyncServer get() {
+    return SERVER.get();
+  }
+
+  /**
+   * Constructs a server listening on the named port and address. Parameters
+   * passed must be of the named class. The
+   * <code>handlerCount</handlerCount> determines
+   * the number of handler threads that will be used to process calls.
+   * 
+   * @param bindAddress
+   * @param port
+   * @param paramClass
+   * @param handlerCount
+   * @param conf
+   * @throws IOException
+   */
+  protected AsyncServer(String bindAddress, int port,
+      Class<? extends Writable> paramClass, int handlerCount, Configuration conf)
+      throws IOException {
+    this(bindAddress, port, paramClass, handlerCount, conf, Integer
+        .toString(port), null);
+  }
+
+  protected AsyncServer(String bindAddress, int port,
+      Class<? extends Writable> paramClass, int handlerCount,
+      Configuration conf, String serverName) throws IOException {
+    this(bindAddress, port, paramClass, handlerCount, conf, serverName, null);
+  }
+
+  protected AsyncServer(String bindAddress, int port,
+      Class<? extends Writable> paramClass, int handlerCount,
+      Configuration conf, String serverName,
+      SecretManager<? extends TokenIdentifier> secretManager)
+      throws IOException {
+    this.conf = conf;
+    this.port = port;
+    this.address = new InetSocketAddress(bindAddress, port);
+    this.paramClass = paramClass;
+    this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
+        IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
+
+    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
+    this.backlogLength = conf.getInt("ipc.server.listen.queue.size", 100);
+  }
+
+  /** start server listener */
+  public void start() {
+    new NioServerListener().start();
+  }
+
+  private class NioServerListener extends Thread {
+
+    /**
+     * Configure and start nio server
+     */
+    @Override
+    public void run() {
+      SERVER.set(AsyncServer.this);
+      try {
+        // ServerBootstrap is a helper class that sets up a server
+        ServerBootstrap b = new ServerBootstrap();
+        b.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .option(ChannelOption.SO_BACKLOG, backlogLength)
+            .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
+            .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+            .childOption(ChannelOption.SO_KEEPALIVE, true)
+            .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024)
+            .childOption(ChannelOption.RCVBUF_ALLOCATOR,
+                new FixedRecvByteBufAllocator(100 * 1024))
+
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+              @Override
+              public void initChannel(SocketChannel ch) throws Exception {
+                ChannelPipeline p = ch.pipeline();
+                // Register accumulation processing handler
+                p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0));
+                // Register message processing handler
+                p.addLast(new NioServerInboundHandler());
+              }
+            });
+
+        // Bind and start to accept incoming connections.
+        ChannelFuture f = b.bind(port).sync();
+        LOG.info("AsyncServer startup");
+        // Wait until the server socket is closed.
+        f.channel().closeFuture().sync();
+      } catch (Exception e) {
+        e.printStackTrace();
+      } finally {
+        // Shut down Server gracefully
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+      }
+    }
+  }
+
+  /** Stops the server gracefully. */
+  public void stop() {
+    if (bossGroup != null && !bossGroup.isTerminated()) {
+      bossGroup.shutdownGracefully();
+    }
+    if (workerGroup != null && !workerGroup.isTerminated()) {
+      workerGroup.shutdownGracefully();
+    }
+    LOG.info("AsyncServer gracefully shutdown");
+  }
+
+  /**
+   * This class dynamically accumulate the recieved data by the value of the
+   * length field in the message
+   */
+  public class NioFrameDecoder extends LengthFieldBasedFrameDecoder {
+
+    /**
+     * @param maxFrameLength - the maximum length of the frame
+     * @param lengthFieldOffset - the offset of the length field
+     * @param lengthFieldLength - the length of the length field
+     * @param lengthAdjustment - the compensation value to add to the value of
+     *          the length field
+     * @param initialBytesToStrip - the number of first bytes to strip out from
+     *          the decoded frame
+     */
+    public NioFrameDecoder(int maxFrameLength, int lengthFieldOffset,
+        int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
+      super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
+          lengthAdjustment, initialBytesToStrip);
+    }
+
+    /**
+     * Decode(Accumulate) the from one ByteBuf to an other
+     * 
+     * @param ctx
+     * @param in
+     */
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
+        throws Exception {
+      ByteBuf recvBuff = (ByteBuf) super.decode(ctx, in);
+      if (recvBuff == null) {
+        return null;
+      }
+      return recvBuff;
+    }
+  }
+
+  /**
+   * This class process received message from client and send response message.
+   */
+  private class NioServerInboundHandler extends ChannelInboundHandlerAdapter {
+    ConnectionHeader header = new ConnectionHeader();
+    Class<?> protocol;
+    private String errorClass = null;
+    private String error = null;
+    private boolean rpcHeaderRead = false; // if initial rpc header is read
+    private boolean headerRead = false; // if the connection header that follows
+                                        // version is read.
+
+    /**
+     * Be invoked only one when a connection is established and ready to
+     * generate traffic
+     * 
+     * @param ctx
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+      SERVER.set(AsyncServer.this);
+    }
+
+    /**
+     * Process a recieved message from client. This method is called with the
+     * received message, whenever new data is received from a client.
+     * 
+     * @param ctx
+     * @param cause
+     */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+      ByteBuffer dataLengthBuffer = ByteBuffer.allocate(4);
+      ByteBuf byteBuf = (ByteBuf) msg;
+
+      ByteBuffer data = null;
+      ByteBuffer rpcHeaderBuffer = null;
+      try {
+        while (true) {
+          Call call = null;
+          errorClass = null;
+          error = null;
+          try {
+            if (dataLengthBuffer.remaining() > 0 && byteBuf.isReadable()) {
+              byteBuf.readBytes(dataLengthBuffer);
+              if (dataLengthBuffer.remaining() > 0 && byteBuf.isReadable()) {
+                return;
+              }
+            } else {
+              return;
+            }
+
+            // read rpcHeader
+            if (!rpcHeaderRead) {
+              // Every connection is expected to send the header.
+              if (rpcHeaderBuffer == null) {
+                dataLengthBuffer = null;
+                dataLengthBuffer = ByteBuffer.allocate(4);
+                byteBuf.readBytes(dataLengthBuffer);
+                rpcHeaderBuffer = ByteBuffer.allocate(2);
+              }
+              byteBuf.readBytes(rpcHeaderBuffer);
+              if (!rpcHeaderBuffer.hasArray()
+                  || rpcHeaderBuffer.remaining() > 0) {
+                return;
+              }
+              int version = rpcHeaderBuffer.get(0);
+              byte[] method = new byte[] { rpcHeaderBuffer.get(1) };
+              try {
+                authMethod = AuthMethod.read(new DataInputStream(
+                    new ByteArrayInputStream(method)));
+                dataLengthBuffer.flip();
+              } catch (IOException ioe) {
+                errorClass = ioe.getClass().getName();
+                error = StringUtils.stringifyException(ioe);
+              }
+
+              if (!HEADER.equals(dataLengthBuffer)
+                  || version != CURRENT_VERSION) {
+                LOG.warn("Incorrect header or version mismatch from "
+                    + address.getHostName() + ":" + address.getPort()
+                    + " got version " + version + " expected version "
+                    + CURRENT_VERSION);
+                return;
+              }
+              dataLengthBuffer.clear();
+              if (authMethod == null) {
+                throw new RuntimeException(
+                    "Unable to read authentication method");
+              }
+              rpcHeaderBuffer = null;
+              rpcHeaderRead = true;
+              continue;
+            }
+
+            // read data length and allocate buffer;
+            if (data == null) {
+              dataLengthBuffer.flip();
+              int dataLength = dataLengthBuffer.getInt();
+              if (dataLength < 0) {
+                LOG.warn("Unexpected data length " + dataLength + "!! from "
+                    + address.getHostName());
+              }
+              data = ByteBuffer.allocate(dataLength);
+            }
+
+            // read received data
+            byteBuf.readBytes(data);
+            if (data.remaining() == 0) {
+              dataLengthBuffer.clear();
+              data.flip();
+              boolean isHeaderRead = headerRead;
+              call = processOneRpc(data.array());
+              data = null;
+              if (!isHeaderRead) {
+                continue;
+              }
+            }
+          } catch (OutOfMemoryError oome) {
+            // we can run out of memory if we have too many threads
+            // log the event and sleep for a minute and give
+            // some thread(s) a chance to finish
+            //
+            LOG.warn("Out of Memory in server select", oome);
+            try {
+              Thread.sleep(60000);
+              errorClass = oome.getClass().getName();
+              error = StringUtils.stringifyException(oome);
+            } catch (Exception ie) {
+            }
+          } catch (Exception e) {
+            LOG.warn("Exception in Responder "
+                + StringUtils.stringifyException(e));
+            errorClass = e.getClass().getName();
+            error = StringUtils.stringifyException(e);
+          }
+          sendResponse(ctx, call);
+        }
+      } finally {
+        ReferenceCountUtil.release(msg);
+      }
+    }
+
+    /**
+     * Send response data to client
+     * 
+     * @param ctx
+     * @param call
+     */
+    private void sendResponse(ChannelHandlerContext ctx, Call call) {
+      ByteArrayOutputStream buf = new ByteArrayOutputStream(
+          INITIAL_RESP_BUF_SIZE);
+      Writable value = null;
+      try {
+        value = call(protocol, call.param, call.timestamp);
+      } catch (Throwable e) {
+        String logMsg = this.getClass().getName() + ", call " + call
+            + ": error: " + e;
+        if (e instanceof RuntimeException || e instanceof Error) {
+          // These exception types indicate something is probably wrong
+          // on the server side, as opposed to just a normal exceptional
+          // result.
+          LOG.warn(logMsg, e);
+        } else if (exceptionsHandler.isTerse(e.getClass())) {
+          // Don't log the whole stack trace of these exceptions.
+          // Way too noisy!
+          LOG.info(logMsg);
+        } else {
+          LOG.info(logMsg, e);
+        }
+        errorClass = e.getClass().getName();
+        error = StringUtils.stringifyException(e);
+      }
+      try {
+        setupResponse(buf, call, (error == null) ? Status.SUCCESS
+            : Status.ERROR, value, errorClass, error);
+        if (buf.size() > maxRespSize) {
+          LOG.warn("Large response size " + buf.size() + " for call "
+              + call.toString());
+          buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
+        }
+        // send response data;
+        channelWrite(ctx, call.response);
+      } catch (Exception e) {
+        LOG.info(this.getClass().getName() + " caught: "
+            + StringUtils.stringifyException(e));
+        error = null;
+      } finally {
+        IOUtils.closeStream(buf);
+      }
+    }
+
+    /**
+     * read header or data
+     * 
+     * @param buf
+     * @return
+     */
+    private Call processOneRpc(byte[] buf) throws IOException {
+      if (headerRead) {
+        return processData(buf);
+      } else {
+        processHeader(buf);
+        headerRead = true;
+        return null;
+      }
+    }
+
+    /**
+     * Reads the connection header following version
+     * 
+     * @param buf buffer
+     */
+    private void processHeader(byte[] buf) {
+      DataInputStream in = new DataInputStream(new ByteArrayInputStream(buf));
+      try {
+        header.readFields(in);
+        String protocolClassName = header.getProtocol();
+        if (protocolClassName != null) {
+          protocol = getProtocolClass(header.getProtocol(), conf);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      } finally {
+        IOUtils.closeStream(in);
+      }
+
+      UserGroupInformation protocolUser = header.getUgi();
+      user = protocolUser;
+    }
+
+    /**
+     * 
+     * Reads the received data, create call object;
+     * 
+     * @param buf buffer to serialize the response into
+     * @return the IPC Call
+     */
+    private Call processData(byte[] buf) {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
+      try {
+        int id = dis.readInt(); // try to read an id
+
+        if (LOG.isDebugEnabled())
+          LOG.debug(" got #" + id);
+        Writable param = ReflectionUtils.newInstance(paramClass, conf);
+        param.readFields(dis); // try to read param data
+
+        Call call = new Call(id, param, this);
+
+        return call;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      } finally {
+        IOUtils.closeStream(dis);
+      }
+    }
+  }
+
+  /**
+   * Setup response for the IPC Call.
+   * 
+   * @param response buffer to serialize the response into
+   * @param call {@link Call} to which we are setting up the response
+   * @param status {@link Status} of the IPC call
+   * @param rv return value for the IPC Call, if the call was successful
+   * @param errorClass error class, if the the call failed
+   * @param error error message, if the call failed
+   * @throws IOException
+   */
+  private void setupResponse(ByteArrayOutputStream response, Call call,
+      Status status, Writable rv, String errorClass, String error)
+      throws IOException {
+    response.reset();
+    DataOutputStream out = new DataOutputStream(response);
+    out.writeInt(call.id); // write call id
+    out.writeInt(status.state); // write status
+
+    if (status == Status.SUCCESS) {
+      rv.write(out);
+    } else {
+      WritableUtils.writeString(out, errorClass);
+      WritableUtils.writeString(out, error);
+    }
+    call.setResponse(ByteBuffer.wrap(response.toByteArray()));
+    IOUtils.closeStream(out);
+  }
+
+  /**
+   * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}. If
+   * the amount of data is large, it writes to channel in smaller chunks. This
+   * is to avoid jdk from creating many direct buffers as the size of buffer
+   * increases. This also minimizes extra copies in NIO layer as a result of
+   * multiple write operations required to write a large buffer.
+   * 
+   * @see WritableByteChannel#write(ByteBuffer)
+   * 
+   * @param ctx
+   * @param buffer
+   */
+  private void channelWrite(ChannelHandlerContext ctx, ByteBuffer buffer) {
+    try {
+      ByteBuf buf = ctx.alloc().buffer();
+      buf.writeBytes(buffer.array());
+      ctx.writeAndFlush(buf);
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+  }
+
+  /** A call queued for handling. */
+  private static class Call {
+    private int id; // the client's call id
+    private Writable param; // the parameter passed
+    private ChannelInboundHandlerAdapter connection; // connection to client
+    private long timestamp; // the time received when response is null
+    // the time served when response is not null
+    private ByteBuffer response; // the response for this call
+
+    /**
+     * 
+     * @param id
+     * @param param
+     * @param connection
+     */
+    public Call(int id, Writable param, ChannelInboundHandlerAdapter connection) {
+      this.id = id;
+      this.param = param;
+      this.connection = connection;
+      this.timestamp = System.currentTimeMillis();
+      this.response = null;
+    }
+
+    /**
+     * 
+     */
+    @Override
+    public String toString() {
+      return param.toString() + " from " + connection.toString();
+    }
+
+    /**
+     * 
+     * @param response
+     */
+    public void setResponse(ByteBuffer response) {
+      this.response = response;
+    }
+  }
+
+  /**
+   * ExceptionsHandler manages Exception groups for special handling e.g., terse
+   * exception group for concise logging messages
+   */
+  static class ExceptionsHandler {
+    private volatile Set<String> terseExceptions = new HashSet<String>();
+
+    /**
+     * Add exception class so server won't log its stack trace. Modifying the
+     * terseException through this method is thread safe.
+     * 
+     * @param exceptionClass exception classes
+     */
+    void addTerseExceptions(Class<?>... exceptionClass) {
+
+      // Make a copy of terseException for performing modification
+      final HashSet<String> newSet = new HashSet<String>(terseExceptions);
+
+      // Add all class names into the HashSet
+      for (Class<?> name : exceptionClass) {
+        newSet.add(name.toString());
+      }
+      // Replace terseException set
+      terseExceptions = Collections.unmodifiableSet(newSet);
+    }
+
+    /**
+     * 
+     * @param t
+     * @return
+     */
+    boolean isTerse(Class<?> t) {
+      return terseExceptions.contains(t.toString());
+    }
+  }
+
+  /**
+   * Called for each call.
+   * 
+   * @param protocol
+   * @param param
+   * @param receiveTime
+   * @return Writable
+   * @throws IOException
+   */
+  public abstract Writable call(Class<?> protocol, Writable param,
+      long receiveTime) throws IOException;
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
(added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
Tue Sep 30 00:44:14 2014
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.Counters;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.DiskQueue;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.util.BSPNetUtils;
+
+public class TestHamaAsyncMessageManager extends TestCase {
+
+  public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
+  // increment is here to solve race conditions in parallel execution to choose
+  // other ports.
+  public static volatile int increment = 1;
+
+  public void testMemoryMessaging() throws Exception {
+    HamaConfiguration conf = new HamaConfiguration();
+    conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+        MessageQueue.class);
+    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+    messagingInternal(conf);
+  }
+
+  public void testDiskMessaging() throws Exception {
+    HamaConfiguration conf = new HamaConfiguration();
+    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+    conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class,
+        MessageQueue.class);
+    messagingInternal(conf);
+  }
+
+  private static void messagingInternal(HamaConfiguration conf)
+      throws Exception {
+    conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
+        "org.apache.hama.bsp.message.HamaAsyncMessageManagerImpl");
+    MessageManager<IntWritable> messageManager = MessageManagerFactory
+        .getMessageManager(conf);
+
+    assertTrue(messageManager instanceof HamaAsyncMessageManagerImpl);
+
+    InetSocketAddress peer = new InetSocketAddress(
+        BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort()
+            + (increment++));
+    conf.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    conf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
+    BSPPeer<?, ?, ?, ?, IntWritable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable,
NullWritable, NullWritable, IntWritable>(
+        conf, FileSystem.get(conf), new Counters());
+    TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1);
+    messageManager.init(id, dummyPeer, conf, peer);
+    peer = messageManager.getListenerAddress();
+    String peerName = peer.getHostName() + ":" + peer.getPort();
+    System.out.println("Peer is " + peerName);
+    messageManager.send(peerName, new IntWritable(1337));
+
+    Iterator<Entry<InetSocketAddress, BSPMessageBundle<IntWritable>>> messageIterator
= messageManager
+        .getOutgoingBundles();
+
+    Entry<InetSocketAddress, BSPMessageBundle<IntWritable>> entry = messageIterator
+        .next();
+
+    assertEquals(entry.getKey(), peer);
+
+    assertTrue(entry.getValue().size() == 1);
+
+    BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
+    Iterator<IntWritable> it = entry.getValue().iterator();
+    while (it.hasNext()) {
+      bundle.addMessage(it.next());
+    }
+
+    messageManager.transfer(peer, bundle);
+
+    messageManager.clearOutgoingMessages();
+
+    assertTrue(messageManager.getNumCurrentMessages() == 1);
+    IntWritable currentMessage = messageManager.getCurrentMessage();
+
+    assertEquals(currentMessage.get(), 1337);
+    messageManager.close();
+  }
+}

Propchange: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java Tue Sep 30 00:44:14
2014
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+
+public class TestAsyncIPC extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
+
+  final private static Configuration conf = new Configuration();
+  final static private int PING_INTERVAL = 1000;
+
+  static {
+    Client.setPingInterval(conf, PING_INTERVAL);
+  }
+
+  public TestAsyncIPC(String name) {
+    super(name);
+  }
+
+  private static final Random RANDOM = new Random();
+
+  private static final String ADDRESS = "0.0.0.0";
+  private static int port = 7000;
+
+  private static class TestServer extends AsyncServer {
+    private boolean sleep;
+
+    public TestServer(int handlerCount, boolean sleep) throws IOException {
+      super(ADDRESS, port++, LongWritable.class, handlerCount, conf);
+      this.sleep = sleep;
+    }
+
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
+      if (sleep) {
+        try {
+          Thread.sleep(RANDOM.nextInt(2 * PING_INTERVAL)); // sleep a bit
+        } catch (InterruptedException e) {
+        }
+      }
+      return param; // echo param as result
+    }
+  }
+
+  private static class SerialCaller extends Thread {
+    private AsyncClient client;
+    private InetSocketAddress serverAddress;
+    private int count;
+    private boolean failed;
+
+    public SerialCaller(AsyncClient client, InetSocketAddress server, int count) {
+      this.client = client;
+      this.serverAddress = server;
+      this.count = count;
+    }
+
+    @Override
+    public void run() {
+      for (int i = 0; i < count; i++) {
+        try {
+          LongWritable param = new LongWritable(RANDOM.nextLong());
+
+          LongWritable value = (LongWritable) client.call(param, serverAddress,
+              null, null, 0, conf);
+          if (!param.equals(value)) {
+            LOG.fatal("Call failed!");
+            failed = true;
+            break;
+          }
+
+        } catch (Exception e) {
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+          failed = true;
+        }
+      }
+    }
+  }
+
+  private static class ParallelCaller extends Thread {
+    private AsyncClient client;
+    private int count;
+    private InetSocketAddress[] addresses;
+    private boolean failed;
+
+    public ParallelCaller(AsyncClient client, InetSocketAddress[] addresses,
+        int count) {
+      this.client = client;
+      this.addresses = addresses;
+      this.count = count;
+    }
+
+    @Override
+    public void run() {
+      for (int i = 0; i < count; i++) {
+        try {
+          Writable[] params = new Writable[addresses.length];
+          for (int j = 0; j < addresses.length; j++)
+            params[j] = new LongWritable(RANDOM.nextLong());
+          Writable[] values = client.call(params, addresses, null, null, conf);
+
+          for (int j = 0; j < addresses.length; j++) {
+            if (!params[j].equals(values[j])) {
+              LOG.fatal("Call failed!");
+              failed = true;
+              break;
+            }
+          }
+        } catch (Exception e) {
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+          failed = true;
+        }
+      }
+    }
+  }
+
+  public void testSerial() throws Exception {
+    testSerial(3, false, 2, 5, 100);
+  }
+
+  public void testSerial(int handlerCount, boolean handlerSleep,
+      int clientCount, int callerCount, int callCount) throws Exception {
+    AsyncServer server = new TestServer(handlerCount, handlerSleep);
+    InetSocketAddress addr = server.getAddress();
+    server.start();
+
+    AsyncClient[] clients = new AsyncClient[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new AsyncClient(LongWritable.class, conf);
+    }
+
+    SerialCaller[] callers = new SerialCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      assertFalse(callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    server.stop();
+  }
+
+  public void testParallel() throws Exception {
+    testParallel(10, false, 2, 4, 2, 4, 100);
+  }
+
+  public void testParallel(int handlerCount, boolean handlerSleep,
+      int serverCount, int addressCount, int clientCount, int callerCount,
+      int callCount) throws Exception {
+    AsyncServer[] servers = new AsyncServer[serverCount];
+    for (int i = 0; i < serverCount; i++) {
+      servers[i] = new TestServer(handlerCount, handlerSleep);
+      servers[i].start();
+    }
+
+    InetSocketAddress[] addresses = new InetSocketAddress[addressCount];
+    for (int i = 0; i < addressCount; i++) {
+      addresses[i] = servers[i % serverCount].address;
+    }
+
+    AsyncClient[] clients = new AsyncClient[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new AsyncClient(LongWritable.class, conf);
+    }
+
+    ParallelCaller[] callers = new ParallelCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new ParallelCaller(clients[i % clientCount], addresses,
+          callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      assertFalse(callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    for (int i = 0; i < serverCount; i++) {
+      servers[i].stop();
+    }
+  }
+}

Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java Tue Sep 30 00:44:14
2014
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+public class TestAsyncRPC extends TestCase {
+  private static final int PORT = 1234;
+  private static final String ADDRESS = "0.0.0.0";
+
+  public static final Log LOG = LogFactory
+      .getLog("org.apache.hama.ipc.TestRPCWithNetty");
+
+  private static Configuration conf = new Configuration();
+
+  public TestAsyncRPC(String name) {
+    super(name);
+  }
+
+  public interface TestProtocol extends VersionedProtocol {
+    public static final long versionID = 1L;
+
+    void ping() throws IOException;
+
+    String echo(String value) throws IOException;
+
+    String[] echo(String[] value) throws IOException;
+
+    Writable echo(Writable value) throws IOException;
+
+    int add(int v1, int v2) throws IOException;
+
+    int add(int[] values) throws IOException;
+
+    int error() throws IOException;
+
+    void testServerGet() throws IOException;
+  }
+
+  public class TestImpl implements TestProtocol {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TestProtocol.versionID;
+    }
+
+    @Override
+    public void ping() {
+    }
+
+    @Override
+    public String echo(String value) throws IOException {
+      return value;
+    }
+
+    @Override
+    public String[] echo(String[] values) throws IOException {
+      return values;
+    }
+
+    @Override
+    public Writable echo(Writable writable) {
+      return writable;
+    }
+
+    @Override
+    public int add(int v1, int v2) {
+      return v1 + v2;
+    }
+
+    @Override
+    public int add(int[] values) {
+      int sum = 0;
+      for (int i = 0; i < values.length; i++) {
+        sum += values[i];
+      }
+      return sum;
+    }
+
+    @Override
+    public int error() throws IOException {
+      throw new IOException("bobo");
+    }
+
+    @Override
+    public void testServerGet() throws IOException {
+      AsyncServer server = AsyncServer.get();
+      if (!(server instanceof AsyncRPC.NioServer)) {
+        throw new IOException("ServerWithNetty.get() failed");
+      }
+    }
+  }
+
+  public void testCalls() throws Exception {
+    AsyncServer server = AsyncRPC
+        .getServer(new TestImpl(), ADDRESS, PORT, conf);
+    server.start();
+
+    InetSocketAddress addr = new InetSocketAddress(PORT);
+    TestProtocol proxy = (TestProtocol) AsyncRPC.getProxy(TestProtocol.class,
+        TestProtocol.versionID, addr, conf);
+
+    proxy.ping();
+
+    String stringResult = proxy.echo("foo");
+    assertEquals(stringResult, "foo");
+
+    stringResult = proxy.echo((String) null);
+    assertEquals(stringResult, null);
+
+    String[] stringResults = proxy.echo(new String[] { "foo", "bar" });
+    assertTrue(Arrays.equals(stringResults, new String[] { "foo", "bar" }));
+
+    stringResults = proxy.echo((String[]) null);
+    assertTrue(Arrays.equals(stringResults, null));
+
+    int intResult = proxy.add(1, 2);
+    assertEquals(intResult, 3);
+
+    intResult = proxy.add(new int[] { 1, 2 });
+    assertEquals(intResult, 3);
+
+    boolean caught = false;
+    try {
+      proxy.error();
+    } catch (Exception e) {
+      LOG.debug("Caught " + e);
+      caught = true;
+    }
+    assertTrue(caught);
+
+    proxy.testServerGet();
+
+    // try some multi-calls
+    Method echo = TestProtocol.class.getMethod("echo",
+        new Class[] { String.class });
+    String[] strings = (String[]) AsyncRPC.call(echo, new String[][] { { "a" },
+        { "b" } }, new InetSocketAddress[] { addr, addr }, null, conf);
+    assertTrue(Arrays.equals(strings, new String[] { "a", "b" }));
+
+    Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
+    Object[] voids = AsyncRPC.call(ping, new Object[][] { {}, {} },
+        new InetSocketAddress[] { addr, addr }, null, conf);
+    assertEquals(voids, null);
+
+    server.stop();
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestAsyncRPC("test").testCalls();
+  }
+
+}

Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message