Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F292917D79 for ; Tue, 30 Sep 2014 00:44:38 +0000 (UTC) Received: (qmail 16774 invoked by uid 500); 30 Sep 2014 00:44:38 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 16743 invoked by uid 500); 30 Sep 2014 00:44:38 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 16732 invoked by uid 99); 30 Sep 2014 00:44:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Sep 2014 00:44:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Sep 2014 00:44:35 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A1CF423889E2; Tue, 30 Sep 2014 00:44:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1628344 [2/2] - in /hama/trunk/core: ./ src/main/java/org/apache/hama/bsp/message/ src/main/java/org/apache/hama/ipc/ src/test/java/org/apache/hama/bsp/message/ src/test/java/org/apache/hama/ipc/ Date: Tue, 30 Sep 2014 00:44:15 -0000 To: commits@hama.apache.org From: bsmin@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140930004415.A1CF423889E2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java?rev=1628344&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java Tue Sep 30 00:44:14 2014 @@ -0,0 +1,671 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ipc; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.ReferenceCountUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.security.SaslRpcServer.AuthMethod; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +/** + * An abstract IPC service using netty. IPC calls take a single {@link Writable} + * as a parameter, and return a {@link Writable}* + * + * @see AsyncServer + */ +public abstract class AsyncServer { + + private AuthMethod authMethod; + static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + static int INITIAL_RESP_BUF_SIZE = 1024; + UserGroupInformation user = null; + // 1 : Introduce ping and server does not throw away RPCs + // 3 : Introduce the protocol into the RPC connection header + // 4 : Introduced SASL security layer + static final byte CURRENT_VERSION = 4; + static final int HEADER_LENGTH = 10; + // follows version is read. + private Configuration conf; + private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + private int backlogLength;; + InetSocketAddress address; + private static final Log LOG = LogFactory.getLog(AsyncServer.class); + private static int NIO_BUFFER_LIMIT = 8 * 1024; + private final int maxRespSize; + static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size"; + static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024 * 1024; + + private static final ThreadLocal SERVER = new ThreadLocal(); + private int port; // port we listen on + private Class paramClass; // class of call parameters + // Configure the server.(constructor is thread num) + private EventLoopGroup bossGroup = new NioEventLoopGroup(1); + private EventLoopGroup workerGroup = new NioEventLoopGroup(); + private static final Map> PROTOCOL_CACHE = new ConcurrentHashMap>(); + private ExceptionsHandler exceptionsHandler = new ExceptionsHandler(); + + static Class getProtocolClass(String protocolName, Configuration conf) + throws ClassNotFoundException { + Class protocol = PROTOCOL_CACHE.get(protocolName); + if (protocol == null) { + protocol = conf.getClassByName(protocolName); + PROTOCOL_CACHE.put(protocolName, protocol); + } + return protocol; + } + + /** + * Getting address + * + * @return InetSocketAddress + */ + public InetSocketAddress getAddress() { + return address; + } + + /** + * Returns the server instance called under or null. May be called under + * {@link #call(Writable, long)} implementations, and under {@link Writable} + * methods of paramters and return values. Permits applications to access the + * server context. + * + * @return NioServer + */ + public static AsyncServer get() { + return SERVER.get(); + } + + /** + * Constructs a server listening on the named port and address. Parameters + * passed must be of the named class. The + * handlerCount determines + * the number of handler threads that will be used to process calls. + * + * @param bindAddress + * @param port + * @param paramClass + * @param handlerCount + * @param conf + * @throws IOException + */ + protected AsyncServer(String bindAddress, int port, + Class paramClass, int handlerCount, Configuration conf) + throws IOException { + this(bindAddress, port, paramClass, handlerCount, conf, Integer + .toString(port), null); + } + + protected AsyncServer(String bindAddress, int port, + Class paramClass, int handlerCount, + Configuration conf, String serverName) throws IOException { + this(bindAddress, port, paramClass, handlerCount, conf, serverName, null); + } + + protected AsyncServer(String bindAddress, int port, + Class paramClass, int handlerCount, + Configuration conf, String serverName, + SecretManager secretManager) + throws IOException { + this.conf = conf; + this.port = port; + this.address = new InetSocketAddress(bindAddress, port); + this.paramClass = paramClass; + this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, + IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); + + this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true); + this.backlogLength = conf.getInt("ipc.server.listen.queue.size", 100); + } + + /** start server listener */ + public void start() { + new NioServerListener().start(); + } + + private class NioServerListener extends Thread { + + /** + * Configure and start nio server + */ + @Override + public void run() { + SERVER.set(AsyncServer.this); + try { + // ServerBootstrap is a helper class that sets up a server + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, backlogLength) + .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT) + .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024) + .childOption(ChannelOption.RCVBUF_ALLOCATOR, + new FixedRecvByteBufAllocator(100 * 1024)) + + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + // Register accumulation processing handler + p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0)); + // Register message processing handler + p.addLast(new NioServerInboundHandler()); + } + }); + + // Bind and start to accept incoming connections. + ChannelFuture f = b.bind(port).sync(); + LOG.info("AsyncServer startup"); + // Wait until the server socket is closed. + f.channel().closeFuture().sync(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + // Shut down Server gracefully + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + } + + /** Stops the server gracefully. */ + public void stop() { + if (bossGroup != null && !bossGroup.isTerminated()) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null && !workerGroup.isTerminated()) { + workerGroup.shutdownGracefully(); + } + LOG.info("AsyncServer gracefully shutdown"); + } + + /** + * This class dynamically accumulate the recieved data by the value of the + * length field in the message + */ + public class NioFrameDecoder extends LengthFieldBasedFrameDecoder { + + /** + * @param maxFrameLength - the maximum length of the frame + * @param lengthFieldOffset - the offset of the length field + * @param lengthFieldLength - the length of the length field + * @param lengthAdjustment - the compensation value to add to the value of + * the length field + * @param initialBytesToStrip - the number of first bytes to strip out from + * the decoded frame + */ + public NioFrameDecoder(int maxFrameLength, int lengthFieldOffset, + int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, + lengthAdjustment, initialBytesToStrip); + } + + /** + * Decode(Accumulate) the from one ByteBuf to an other + * + * @param ctx + * @param in + */ + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) + throws Exception { + ByteBuf recvBuff = (ByteBuf) super.decode(ctx, in); + if (recvBuff == null) { + return null; + } + return recvBuff; + } + } + + /** + * This class process received message from client and send response message. + */ + private class NioServerInboundHandler extends ChannelInboundHandlerAdapter { + ConnectionHeader header = new ConnectionHeader(); + Class protocol; + private String errorClass = null; + private String error = null; + private boolean rpcHeaderRead = false; // if initial rpc header is read + private boolean headerRead = false; // if the connection header that follows + // version is read. + + /** + * Be invoked only one when a connection is established and ready to + * generate traffic + * + * @param ctx + */ + @Override + public void channelActive(ChannelHandlerContext ctx) { + SERVER.set(AsyncServer.this); + } + + /** + * Process a recieved message from client. This method is called with the + * received message, whenever new data is received from a client. + * + * @param ctx + * @param cause + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuffer dataLengthBuffer = ByteBuffer.allocate(4); + ByteBuf byteBuf = (ByteBuf) msg; + + ByteBuffer data = null; + ByteBuffer rpcHeaderBuffer = null; + try { + while (true) { + Call call = null; + errorClass = null; + error = null; + try { + if (dataLengthBuffer.remaining() > 0 && byteBuf.isReadable()) { + byteBuf.readBytes(dataLengthBuffer); + if (dataLengthBuffer.remaining() > 0 && byteBuf.isReadable()) { + return; + } + } else { + return; + } + + // read rpcHeader + if (!rpcHeaderRead) { + // Every connection is expected to send the header. + if (rpcHeaderBuffer == null) { + dataLengthBuffer = null; + dataLengthBuffer = ByteBuffer.allocate(4); + byteBuf.readBytes(dataLengthBuffer); + rpcHeaderBuffer = ByteBuffer.allocate(2); + } + byteBuf.readBytes(rpcHeaderBuffer); + if (!rpcHeaderBuffer.hasArray() + || rpcHeaderBuffer.remaining() > 0) { + return; + } + int version = rpcHeaderBuffer.get(0); + byte[] method = new byte[] { rpcHeaderBuffer.get(1) }; + try { + authMethod = AuthMethod.read(new DataInputStream( + new ByteArrayInputStream(method))); + dataLengthBuffer.flip(); + } catch (IOException ioe) { + errorClass = ioe.getClass().getName(); + error = StringUtils.stringifyException(ioe); + } + + if (!HEADER.equals(dataLengthBuffer) + || version != CURRENT_VERSION) { + LOG.warn("Incorrect header or version mismatch from " + + address.getHostName() + ":" + address.getPort() + + " got version " + version + " expected version " + + CURRENT_VERSION); + return; + } + dataLengthBuffer.clear(); + if (authMethod == null) { + throw new RuntimeException( + "Unable to read authentication method"); + } + rpcHeaderBuffer = null; + rpcHeaderRead = true; + continue; + } + + // read data length and allocate buffer; + if (data == null) { + dataLengthBuffer.flip(); + int dataLength = dataLengthBuffer.getInt(); + if (dataLength < 0) { + LOG.warn("Unexpected data length " + dataLength + "!! from " + + address.getHostName()); + } + data = ByteBuffer.allocate(dataLength); + } + + // read received data + byteBuf.readBytes(data); + if (data.remaining() == 0) { + dataLengthBuffer.clear(); + data.flip(); + boolean isHeaderRead = headerRead; + call = processOneRpc(data.array()); + data = null; + if (!isHeaderRead) { + continue; + } + } + } catch (OutOfMemoryError oome) { + // we can run out of memory if we have too many threads + // log the event and sleep for a minute and give + // some thread(s) a chance to finish + // + LOG.warn("Out of Memory in server select", oome); + try { + Thread.sleep(60000); + errorClass = oome.getClass().getName(); + error = StringUtils.stringifyException(oome); + } catch (Exception ie) { + } + } catch (Exception e) { + LOG.warn("Exception in Responder " + + StringUtils.stringifyException(e)); + errorClass = e.getClass().getName(); + error = StringUtils.stringifyException(e); + } + sendResponse(ctx, call); + } + } finally { + ReferenceCountUtil.release(msg); + } + } + + /** + * Send response data to client + * + * @param ctx + * @param call + */ + private void sendResponse(ChannelHandlerContext ctx, Call call) { + ByteArrayOutputStream buf = new ByteArrayOutputStream( + INITIAL_RESP_BUF_SIZE); + Writable value = null; + try { + value = call(protocol, call.param, call.timestamp); + } catch (Throwable e) { + String logMsg = this.getClass().getName() + ", call " + call + + ": error: " + e; + if (e instanceof RuntimeException || e instanceof Error) { + // These exception types indicate something is probably wrong + // on the server side, as opposed to just a normal exceptional + // result. + LOG.warn(logMsg, e); + } else if (exceptionsHandler.isTerse(e.getClass())) { + // Don't log the whole stack trace of these exceptions. + // Way too noisy! + LOG.info(logMsg); + } else { + LOG.info(logMsg, e); + } + errorClass = e.getClass().getName(); + error = StringUtils.stringifyException(e); + } + try { + setupResponse(buf, call, (error == null) ? Status.SUCCESS + : Status.ERROR, value, errorClass, error); + if (buf.size() > maxRespSize) { + LOG.warn("Large response size " + buf.size() + " for call " + + call.toString()); + buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); + } + // send response data; + channelWrite(ctx, call.response); + } catch (Exception e) { + LOG.info(this.getClass().getName() + " caught: " + + StringUtils.stringifyException(e)); + error = null; + } finally { + IOUtils.closeStream(buf); + } + } + + /** + * read header or data + * + * @param buf + * @return + */ + private Call processOneRpc(byte[] buf) throws IOException { + if (headerRead) { + return processData(buf); + } else { + processHeader(buf); + headerRead = true; + return null; + } + } + + /** + * Reads the connection header following version + * + * @param buf buffer + */ + private void processHeader(byte[] buf) { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(buf)); + try { + header.readFields(in); + String protocolClassName = header.getProtocol(); + if (protocolClassName != null) { + protocol = getProtocolClass(header.getProtocol(), conf); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + IOUtils.closeStream(in); + } + + UserGroupInformation protocolUser = header.getUgi(); + user = protocolUser; + } + + /** + * + * Reads the received data, create call object; + * + * @param buf buffer to serialize the response into + * @return the IPC Call + */ + private Call processData(byte[] buf) { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); + try { + int id = dis.readInt(); // try to read an id + + if (LOG.isDebugEnabled()) + LOG.debug(" got #" + id); + Writable param = ReflectionUtils.newInstance(paramClass, conf); + param.readFields(dis); // try to read param data + + Call call = new Call(id, param, this); + + return call; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + IOUtils.closeStream(dis); + } + } + } + + /** + * Setup response for the IPC Call. + * + * @param response buffer to serialize the response into + * @param call {@link Call} to which we are setting up the response + * @param status {@link Status} of the IPC call + * @param rv return value for the IPC Call, if the call was successful + * @param errorClass error class, if the the call failed + * @param error error message, if the call failed + * @throws IOException + */ + private void setupResponse(ByteArrayOutputStream response, Call call, + Status status, Writable rv, String errorClass, String error) + throws IOException { + response.reset(); + DataOutputStream out = new DataOutputStream(response); + out.writeInt(call.id); // write call id + out.writeInt(status.state); // write status + + if (status == Status.SUCCESS) { + rv.write(out); + } else { + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); + } + call.setResponse(ByteBuffer.wrap(response.toByteArray())); + IOUtils.closeStream(out); + } + + /** + * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}. If + * the amount of data is large, it writes to channel in smaller chunks. This + * is to avoid jdk from creating many direct buffers as the size of buffer + * increases. This also minimizes extra copies in NIO layer as a result of + * multiple write operations required to write a large buffer. + * + * @see WritableByteChannel#write(ByteBuffer) + * + * @param ctx + * @param buffer + */ + private void channelWrite(ChannelHandlerContext ctx, ByteBuffer buffer) { + try { + ByteBuf buf = ctx.alloc().buffer(); + buf.writeBytes(buffer.array()); + ctx.writeAndFlush(buf); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + /** A call queued for handling. */ + private static class Call { + private int id; // the client's call id + private Writable param; // the parameter passed + private ChannelInboundHandlerAdapter connection; // connection to client + private long timestamp; // the time received when response is null + // the time served when response is not null + private ByteBuffer response; // the response for this call + + /** + * + * @param id + * @param param + * @param connection + */ + public Call(int id, Writable param, ChannelInboundHandlerAdapter connection) { + this.id = id; + this.param = param; + this.connection = connection; + this.timestamp = System.currentTimeMillis(); + this.response = null; + } + + /** + * + */ + @Override + public String toString() { + return param.toString() + " from " + connection.toString(); + } + + /** + * + * @param response + */ + public void setResponse(ByteBuffer response) { + this.response = response; + } + } + + /** + * ExceptionsHandler manages Exception groups for special handling e.g., terse + * exception group for concise logging messages + */ + static class ExceptionsHandler { + private volatile Set terseExceptions = new HashSet(); + + /** + * Add exception class so server won't log its stack trace. Modifying the + * terseException through this method is thread safe. + * + * @param exceptionClass exception classes + */ + void addTerseExceptions(Class... exceptionClass) { + + // Make a copy of terseException for performing modification + final HashSet newSet = new HashSet(terseExceptions); + + // Add all class names into the HashSet + for (Class name : exceptionClass) { + newSet.add(name.toString()); + } + // Replace terseException set + terseExceptions = Collections.unmodifiableSet(newSet); + } + + /** + * + * @param t + * @return + */ + boolean isTerse(Class t) { + return terseExceptions.contains(t.toString()); + } + } + + /** + * Called for each call. + * + * @param protocol + * @param param + * @param receiveTime + * @return Writable + * @throws IOException + */ + public abstract Writable call(Class protocol, Writable param, + long receiveTime) throws IOException; +} Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1628344&view=auto ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java (added) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java Tue Sep 30 00:44:14 2014 @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.Map.Entry; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hama.Constants; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSPMessageBundle; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.BSPPeerImpl; +import org.apache.hama.bsp.Counters; +import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.bsp.message.queue.DiskQueue; +import org.apache.hama.bsp.message.queue.MemoryQueue; +import org.apache.hama.bsp.message.queue.MessageQueue; +import org.apache.hama.util.BSPNetUtils; + +public class TestHamaAsyncMessageManager extends TestCase { + + public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue"; + // increment is here to solve race conditions in parallel execution to choose + // other ports. + public static volatile int increment = 1; + + public void testMemoryMessaging() throws Exception { + HamaConfiguration conf = new HamaConfiguration(); + conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class, + MessageQueue.class); + conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH); + messagingInternal(conf); + } + + public void testDiskMessaging() throws Exception { + HamaConfiguration conf = new HamaConfiguration(); + conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH); + conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class, + MessageQueue.class); + messagingInternal(conf); + } + + private static void messagingInternal(HamaConfiguration conf) + throws Exception { + conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS, + "org.apache.hama.bsp.message.HamaAsyncMessageManagerImpl"); + MessageManager messageManager = MessageManagerFactory + .getMessageManager(conf); + + assertTrue(messageManager instanceof HamaAsyncMessageManagerImpl); + + InetSocketAddress peer = new InetSocketAddress( + BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort() + + (increment++)); + conf.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); + conf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); + + BSPPeer dummyPeer = new BSPPeerImpl( + conf, FileSystem.get(conf), new Counters()); + TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1); + messageManager.init(id, dummyPeer, conf, peer); + peer = messageManager.getListenerAddress(); + String peerName = peer.getHostName() + ":" + peer.getPort(); + System.out.println("Peer is " + peerName); + messageManager.send(peerName, new IntWritable(1337)); + + Iterator>> messageIterator = messageManager + .getOutgoingBundles(); + + Entry> entry = messageIterator + .next(); + + assertEquals(entry.getKey(), peer); + + assertTrue(entry.getValue().size() == 1); + + BSPMessageBundle bundle = new BSPMessageBundle(); + Iterator it = entry.getValue().iterator(); + while (it.hasNext()) { + bundle.addMessage(it.next()); + } + + messageManager.transfer(peer, bundle); + + messageManager.clearOutgoingMessages(); + + assertTrue(messageManager.getNumCurrentMessages() == 1); + IntWritable currentMessage = messageManager.getCurrentMessage(); + + assertEquals(currentMessage.get(), 1337); + messageManager.close(); + } +} Propchange: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java?rev=1628344&view=auto ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java (added) +++ hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java Tue Sep 30 00:44:14 2014 @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; + +public class TestAsyncIPC extends TestCase { + + public static final Log LOG = LogFactory.getLog(TestAsyncIPC.class); + + final private static Configuration conf = new Configuration(); + final static private int PING_INTERVAL = 1000; + + static { + Client.setPingInterval(conf, PING_INTERVAL); + } + + public TestAsyncIPC(String name) { + super(name); + } + + private static final Random RANDOM = new Random(); + + private static final String ADDRESS = "0.0.0.0"; + private static int port = 7000; + + private static class TestServer extends AsyncServer { + private boolean sleep; + + public TestServer(int handlerCount, boolean sleep) throws IOException { + super(ADDRESS, port++, LongWritable.class, handlerCount, conf); + this.sleep = sleep; + } + + @Override + public Writable call(Class protocol, Writable param, long receiveTime) + throws IOException { + if (sleep) { + try { + Thread.sleep(RANDOM.nextInt(2 * PING_INTERVAL)); // sleep a bit + } catch (InterruptedException e) { + } + } + return param; // echo param as result + } + } + + private static class SerialCaller extends Thread { + private AsyncClient client; + private InetSocketAddress serverAddress; + private int count; + private boolean failed; + + public SerialCaller(AsyncClient client, InetSocketAddress server, int count) { + this.client = client; + this.serverAddress = server; + this.count = count; + } + + @Override + public void run() { + for (int i = 0; i < count; i++) { + try { + LongWritable param = new LongWritable(RANDOM.nextLong()); + + LongWritable value = (LongWritable) client.call(param, serverAddress, + null, null, 0, conf); + if (!param.equals(value)) { + LOG.fatal("Call failed!"); + failed = true; + break; + } + + } catch (Exception e) { + LOG.fatal("Caught: " + StringUtils.stringifyException(e)); + failed = true; + } + } + } + } + + private static class ParallelCaller extends Thread { + private AsyncClient client; + private int count; + private InetSocketAddress[] addresses; + private boolean failed; + + public ParallelCaller(AsyncClient client, InetSocketAddress[] addresses, + int count) { + this.client = client; + this.addresses = addresses; + this.count = count; + } + + @Override + public void run() { + for (int i = 0; i < count; i++) { + try { + Writable[] params = new Writable[addresses.length]; + for (int j = 0; j < addresses.length; j++) + params[j] = new LongWritable(RANDOM.nextLong()); + Writable[] values = client.call(params, addresses, null, null, conf); + + for (int j = 0; j < addresses.length; j++) { + if (!params[j].equals(values[j])) { + LOG.fatal("Call failed!"); + failed = true; + break; + } + } + } catch (Exception e) { + LOG.fatal("Caught: " + StringUtils.stringifyException(e)); + failed = true; + } + } + } + } + + public void testSerial() throws Exception { + testSerial(3, false, 2, 5, 100); + } + + public void testSerial(int handlerCount, boolean handlerSleep, + int clientCount, int callerCount, int callCount) throws Exception { + AsyncServer server = new TestServer(handlerCount, handlerSleep); + InetSocketAddress addr = server.getAddress(); + server.start(); + + AsyncClient[] clients = new AsyncClient[clientCount]; + for (int i = 0; i < clientCount; i++) { + clients[i] = new AsyncClient(LongWritable.class, conf); + } + + SerialCaller[] callers = new SerialCaller[callerCount]; + for (int i = 0; i < callerCount; i++) { + callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount); + callers[i].start(); + } + for (int i = 0; i < callerCount; i++) { + callers[i].join(); + assertFalse(callers[i].failed); + } + for (int i = 0; i < clientCount; i++) { + clients[i].stop(); + } + server.stop(); + } + + public void testParallel() throws Exception { + testParallel(10, false, 2, 4, 2, 4, 100); + } + + public void testParallel(int handlerCount, boolean handlerSleep, + int serverCount, int addressCount, int clientCount, int callerCount, + int callCount) throws Exception { + AsyncServer[] servers = new AsyncServer[serverCount]; + for (int i = 0; i < serverCount; i++) { + servers[i] = new TestServer(handlerCount, handlerSleep); + servers[i].start(); + } + + InetSocketAddress[] addresses = new InetSocketAddress[addressCount]; + for (int i = 0; i < addressCount; i++) { + addresses[i] = servers[i % serverCount].address; + } + + AsyncClient[] clients = new AsyncClient[clientCount]; + for (int i = 0; i < clientCount; i++) { + clients[i] = new AsyncClient(LongWritable.class, conf); + } + + ParallelCaller[] callers = new ParallelCaller[callerCount]; + for (int i = 0; i < callerCount; i++) { + callers[i] = new ParallelCaller(clients[i % clientCount], addresses, + callCount); + callers[i].start(); + } + for (int i = 0; i < callerCount; i++) { + callers[i].join(); + assertFalse(callers[i].failed); + } + for (int i = 0; i < clientCount; i++) { + clients[i].stop(); + } + for (int i = 0; i < serverCount; i++) { + servers[i].stop(); + } + } +} Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java?rev=1628344&view=auto ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java (added) +++ hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java Tue Sep 30 00:44:14 2014 @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ipc; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.Arrays; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; + +public class TestAsyncRPC extends TestCase { + private static final int PORT = 1234; + private static final String ADDRESS = "0.0.0.0"; + + public static final Log LOG = LogFactory + .getLog("org.apache.hama.ipc.TestRPCWithNetty"); + + private static Configuration conf = new Configuration(); + + public TestAsyncRPC(String name) { + super(name); + } + + public interface TestProtocol extends VersionedProtocol { + public static final long versionID = 1L; + + void ping() throws IOException; + + String echo(String value) throws IOException; + + String[] echo(String[] value) throws IOException; + + Writable echo(Writable value) throws IOException; + + int add(int v1, int v2) throws IOException; + + int add(int[] values) throws IOException; + + int error() throws IOException; + + void testServerGet() throws IOException; + } + + public class TestImpl implements TestProtocol { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) { + return TestProtocol.versionID; + } + + @Override + public void ping() { + } + + @Override + public String echo(String value) throws IOException { + return value; + } + + @Override + public String[] echo(String[] values) throws IOException { + return values; + } + + @Override + public Writable echo(Writable writable) { + return writable; + } + + @Override + public int add(int v1, int v2) { + return v1 + v2; + } + + @Override + public int add(int[] values) { + int sum = 0; + for (int i = 0; i < values.length; i++) { + sum += values[i]; + } + return sum; + } + + @Override + public int error() throws IOException { + throw new IOException("bobo"); + } + + @Override + public void testServerGet() throws IOException { + AsyncServer server = AsyncServer.get(); + if (!(server instanceof AsyncRPC.NioServer)) { + throw new IOException("ServerWithNetty.get() failed"); + } + } + } + + public void testCalls() throws Exception { + AsyncServer server = AsyncRPC + .getServer(new TestImpl(), ADDRESS, PORT, conf); + server.start(); + + InetSocketAddress addr = new InetSocketAddress(PORT); + TestProtocol proxy = (TestProtocol) AsyncRPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + + proxy.ping(); + + String stringResult = proxy.echo("foo"); + assertEquals(stringResult, "foo"); + + stringResult = proxy.echo((String) null); + assertEquals(stringResult, null); + + String[] stringResults = proxy.echo(new String[] { "foo", "bar" }); + assertTrue(Arrays.equals(stringResults, new String[] { "foo", "bar" })); + + stringResults = proxy.echo((String[]) null); + assertTrue(Arrays.equals(stringResults, null)); + + int intResult = proxy.add(1, 2); + assertEquals(intResult, 3); + + intResult = proxy.add(new int[] { 1, 2 }); + assertEquals(intResult, 3); + + boolean caught = false; + try { + proxy.error(); + } catch (Exception e) { + LOG.debug("Caught " + e); + caught = true; + } + assertTrue(caught); + + proxy.testServerGet(); + + // try some multi-calls + Method echo = TestProtocol.class.getMethod("echo", + new Class[] { String.class }); + String[] strings = (String[]) AsyncRPC.call(echo, new String[][] { { "a" }, + { "b" } }, new InetSocketAddress[] { addr, addr }, null, conf); + assertTrue(Arrays.equals(strings, new String[] { "a", "b" })); + + Method ping = TestProtocol.class.getMethod("ping", new Class[] {}); + Object[] voids = AsyncRPC.call(ping, new Object[][] { {}, {} }, + new InetSocketAddress[] { addr, addr }, null, conf); + assertEquals(voids, null); + + server.stop(); + } + + public static void main(String[] args) throws Exception { + new TestAsyncRPC("test").testCalls(); + } + +} Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java ------------------------------------------------------------------------------ svn:mime-type = text/plain