hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1514580 [3/4] - in /hama/trunk: conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/ipc/ core/src/main/java/org/apache/hama/pipes/util/ core/src/main/java/org/apa...
Date Fri, 16 Aug 2013 05:20:16 GMT
Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/Server.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/Server.java?rev=1514580&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/Server.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/Server.java Fri Aug 16 05:20:15 2013
@@ -0,0 +1,1601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.ipc;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * An abstract IPC service. IPC calls take a single {@link Writable} as a
+ * parameter, and return a {@link Writable} as their value. A service runs on a
+ * port and is defined by a parameter class and a value class.
+ * 
+ * @see Client
+ */
+public abstract class Server {
+  private final boolean authorize;
+  private boolean isSecurityEnabled;
+  private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
+
+  public void addTerseExceptions(Class<?>... exceptionClass) {
+    exceptionsHandler.addTerseExceptions(exceptionClass);
+  }
+
+  /**
+   * ExceptionsHandler manages Exception groups for special handling e.g., terse
+   * exception group for concise logging messages
+   */
+  static class ExceptionsHandler {
+    private volatile Set<String> terseExceptions = new HashSet<String>();
+
+    /**
+     * Add exception class so server won't log its stack trace. Modifying the
+     * terseException through this method is thread safe.
+     * 
+     * @param exceptionClass exception classes
+     */
+    void addTerseExceptions(Class<?>... exceptionClass) {
+
+      // Make a copy of terseException for performing modification
+      final HashSet<String> newSet = new HashSet<String>(terseExceptions);
+
+      // Add all class names into the HashSet
+      for (Class<?> name : exceptionClass) {
+        newSet.add(name.toString());
+      }
+      // Replace terseException set
+      terseExceptions = Collections.unmodifiableSet(newSet);
+    }
+
+    boolean isTerse(Class<?> t) {
+      return terseExceptions.contains(t.toString());
+    }
+  }
+
+  /**
+   * The first four bytes of Hadoop RPC connections
+   */
+  public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+
+  // 1 : Introduce ping and server does not throw away RPCs
+  // 3 : Introduce the protocol into the RPC connection header
+  // 4 : Introduced SASL security layer
+  public static final byte CURRENT_VERSION = 4;
+
+  /**
+   * How many calls/handler are allowed in the queue.
+   */
+  private static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
+  private static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = "ipc.server.handler.queue.size";
+
+  /**
+   * Initial and max size of response buffer
+   */
+  static int INITIAL_RESP_BUF_SIZE = 10240;
+  static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size";
+  static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024 * 1024;
+
+  public static final Log LOG = LogFactory.getLog(Server.class);
+
+  private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
+
+  private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>();
+
+  static Class<?> getProtocolClass(String protocolName, Configuration conf)
+      throws ClassNotFoundException {
+    Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
+    if (protocol == null) {
+      protocol = conf.getClassByName(protocolName);
+      PROTOCOL_CACHE.put(protocolName, protocol);
+    }
+    return protocol;
+  }
+
+  /**
+   * Returns the server instance called under or null. May be called under
+   * {@link #call(Writable, long)} implementations, and under {@link Writable}
+   * methods of paramters and return values. Permits applications to access the
+   * server context.
+   */
+  public static Server get() {
+    return SERVER.get();
+  }
+
+  /**
+   * This is set to Call object before Handler invokes an RPC and reset after
+   * the call returns.
+   */
+  private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+
+  /**
+   * Returns the remote side ip address when invoked inside an RPC Returns null
+   * incase of an error.
+   */
+  public static InetAddress getRemoteIp() {
+    Call call = CurCall.get();
+    if (call != null) {
+      return call.connection.socket.getInetAddress();
+    }
+    return null;
+  }
+
+  /**
+   * Returns remote address as a string when invoked inside an RPC. Returns null
+   * in case of an error.
+   */
+  public static String getRemoteAddress() {
+    InetAddress addr = getRemoteIp();
+    return (addr == null) ? null : addr.getHostAddress();
+  }
+
+  private String bindAddress;
+  private int port; // port we listen on
+  private int handlerCount; // number of handler threads
+  private int readThreads; // number of read threads
+  private Class<? extends Writable> paramClass; // class of call parameters
+  private int maxIdleTime; // the maximum idle time after
+                           // which a client may be disconnected
+  private int thresholdIdleConnections; // the number of idle connections
+                                        // after which we will start
+                                        // cleaning up idle
+                                        // connections
+  int maxConnectionsToNuke; // the max number of
+                            // connections to nuke
+                            // during a cleanup
+
+  private Configuration conf;
+
+  private int maxQueueSize;
+  private final int maxRespSize;
+  private int socketSendBufferSize;
+  private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+
+  volatile private boolean running = true; // true while server runs
+  private BlockingQueue<Call> callQueue; // queued calls
+
+  private List<Connection> connectionList = Collections
+      .synchronizedList(new LinkedList<Connection>());
+  // maintain a list
+  // of client connections
+  private Listener listener = null;
+  private Responder responder = null;
+  private int numConnections = 0;
+  private Handler[] handlers = null;
+
+  /**
+   * A convenience method to bind to a given address and report better
+   * exceptions if the address is not a valid host.
+   * 
+   * @param socket the socket to bind
+   * @param address the address to bind to
+   * @param backlog the number of connections allowed in the queue
+   * @throws BindException if the address can't be bound
+   * @throws UnknownHostException if the address isn't a valid host name
+   * @throws IOException other random errors from bind
+   */
+  public static void bind(ServerSocket socket, InetSocketAddress address,
+      int backlog) throws IOException {
+    try {
+      socket.bind(address, backlog);
+    } catch (BindException e) {
+      BindException bindException = new BindException("Problem binding to "
+          + address + " : " + e.getMessage());
+      bindException.initCause(e);
+      throw bindException;
+    } catch (SocketException e) {
+      // If they try to bind to a different host's address, give a better
+      // error message.
+      if ("Unresolved address".equals(e.getMessage())) {
+        throw new UnknownHostException("Invalid hostname for server: "
+            + address.getHostName());
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /** A call queued for handling. */
+  private static class Call {
+    private int id; // the client's call id
+    private Writable param; // the parameter passed
+    private Connection connection; // connection to client
+    private long timestamp; // the time received when response is null
+                            // the time served when response is not null
+    private ByteBuffer response; // the response for this call
+
+    public Call(int id, Writable param, Connection connection) {
+      this.id = id;
+      this.param = param;
+      this.connection = connection;
+      this.timestamp = System.currentTimeMillis();
+      this.response = null;
+    }
+
+    @Override
+    public String toString() {
+      return param.toString() + " from " + connection.toString();
+    }
+
+    public void setResponse(ByteBuffer response) {
+      this.response = response;
+    }
+  }
+
+  /** Listens on the socket. Creates jobs for the handler threads */
+  private class Listener extends Thread {
+
+    private ServerSocketChannel acceptChannel = null; // the accept channel
+    private Selector selector = null; // the selector that we use for the server
+    private Reader[] readers = null;
+    private int currentReader = 0;
+    private InetSocketAddress address; // the address we bind at
+    private Random rand = new Random();
+    private long lastCleanupRunTime = 0; // the last time when a cleanup connec-
+                                         // -tion (for idle connections) ran
+    private long cleanupInterval = 10000; // the minimum interval between
+                                          // two cleanup runs
+    private int backlogLength = conf
+        .getInt("ipc.server.listen.queue.size", 128);
+
+    public Listener() throws IOException {
+      address = new InetSocketAddress(bindAddress, port);
+      // Create a new server socket and set to non blocking mode
+      acceptChannel = ServerSocketChannel.open();
+      acceptChannel.configureBlocking(false);
+
+      // Bind the server socket to the local host and port
+      bind(acceptChannel.socket(), address, backlogLength);
+      port = acceptChannel.socket().getLocalPort(); // Could be an ephemeral
+                                                    // port
+      // create a selector;
+      selector = Selector.open();
+      readers = new Reader[readThreads];
+      for (int i = 0; i < readThreads; i++) {
+        Selector readSelector = Selector.open();
+        Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port "
+            + port, readSelector);
+        readers[i] = reader;
+        reader.start();
+      }
+
+      // Register accepts on the server socket with the selector.
+      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
+      this.setName("IPC Server listener on " + port);
+      this.setDaemon(true);
+    }
+
+    private class Reader extends Thread {
+      private volatile boolean adding = false;
+      private Selector readSelector = null;
+
+      Reader(String name, Selector readSelector) {
+        super(name);
+        this.readSelector = readSelector;
+      }
+
+      public void run() {
+        LOG.info("Starting " + getName());
+        synchronized (this) {
+          while (running) {
+            SelectionKey key = null;
+            try {
+              readSelector.select();
+              while (adding) {
+                this.wait(1000);
+              }
+
+              Iterator<SelectionKey> iter = readSelector.selectedKeys()
+                  .iterator();
+              while (iter.hasNext()) {
+                key = iter.next();
+                iter.remove();
+                if (key.isValid()) {
+                  if (key.isReadable()) {
+                    doRead(key);
+                  }
+                }
+                key = null;
+              }
+            } catch (InterruptedException e) {
+              if (running) { // unexpected -- log it
+                LOG.info(getName() + " caught: "
+                    + StringUtils.stringifyException(e));
+              }
+            } catch (IOException ex) {
+              LOG.error("Error in Reader", ex);
+            }
+          }
+        }
+      }
+
+      /**
+       * This gets reader into the state that waits for the new channel to be
+       * registered with readSelector. If it was waiting in select() the thread
+       * will be woken up, otherwise whenever select() is called it will return
+       * even if there is nothing to read and wait in while(adding) for
+       * finishAdd call
+       */
+      public void startAdd() {
+        adding = true;
+        readSelector.wakeup();
+      }
+
+      public synchronized SelectionKey registerChannel(SocketChannel channel)
+          throws IOException {
+        return channel.register(readSelector, SelectionKey.OP_READ);
+      }
+
+      public synchronized void finishAdd() {
+        adding = false;
+        this.notify();
+      }
+
+      void shutdown() {
+        assert !running;
+        readSelector.wakeup();
+        try {
+          join();
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    /**
+     * cleanup connections from connectionList. Choose a random range to scan
+     * and also have a limit on the number of the connections that will be
+     * cleanedup per run. The criteria for cleanup is the time for which the
+     * connection was idle. If 'force' is true then all connections will be
+     * looked at for the cleanup.
+     */
+    private void cleanupConnections(boolean force) {
+      if (force || numConnections > thresholdIdleConnections) {
+        long currentTime = System.currentTimeMillis();
+        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
+          return;
+        }
+        int start = 0;
+        int end = numConnections - 1;
+        if (!force) {
+          start = rand.nextInt() % numConnections;
+          end = rand.nextInt() % numConnections;
+          int temp;
+          if (end < start) {
+            temp = start;
+            start = end;
+            end = temp;
+          }
+        }
+        int i = start;
+        int numNuked = 0;
+        while (i <= end) {
+          Connection c;
+          synchronized (connectionList) {
+            try {
+              c = connectionList.get(i);
+            } catch (Exception e) {
+              return;
+            }
+          }
+          if (c.timedOut(currentTime)) {
+            if (LOG.isDebugEnabled())
+              LOG.debug(getName() + ": disconnecting client "
+                  + c.getHostAddress());
+            closeConnection(c);
+            numNuked++;
+            end--;
+            c = null;
+            if (!force && numNuked == maxConnectionsToNuke)
+              break;
+          } else
+            i++;
+        }
+        lastCleanupRunTime = System.currentTimeMillis();
+      }
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(Server.this);
+      while (running) {
+        SelectionKey key = null;
+        try {
+          selector.select();
+          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
+          while (iter.hasNext()) {
+            key = iter.next();
+            iter.remove();
+            try {
+              if (key.isValid()) {
+                if (key.isAcceptable())
+                  doAccept(key);
+              }
+            } catch (IOException e) {
+            }
+            key = null;
+          }
+        } catch (OutOfMemoryError e) {
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give
+          // some thread(s) a chance to finish
+          LOG.warn("Out of Memory in server select", e);
+          closeCurrentConnection(key, e);
+          cleanupConnections(true);
+          try {
+            Thread.sleep(60000);
+          } catch (Exception ie) {
+          }
+        } catch (Exception e) {
+          closeCurrentConnection(key, e);
+        }
+        cleanupConnections(false);
+      }
+      LOG.info("Stopping " + this.getName());
+
+      synchronized (this) {
+        try {
+          acceptChannel.close();
+          selector.close();
+        } catch (IOException e) {
+        }
+
+        selector = null;
+        acceptChannel = null;
+
+        // clean up all connections
+        while (!connectionList.isEmpty()) {
+          closeConnection(connectionList.remove(0));
+        }
+      }
+    }
+
+    private void closeCurrentConnection(SelectionKey key, Throwable e) {
+      if (key != null) {
+        Connection c = (Connection) key.attachment();
+        if (c != null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + ": disconnecting client "
+                + c.getHostAddress());
+          closeConnection(c);
+          c = null;
+        }
+      }
+    }
+
+    InetSocketAddress getAddress() {
+      return (InetSocketAddress) acceptChannel.socket().getLocalSocketAddress();
+    }
+
+    void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
+      Connection c = null;
+      ServerSocketChannel server = (ServerSocketChannel) key.channel();
+      SocketChannel channel;
+      while ((channel = server.accept()) != null) {
+        channel.configureBlocking(false);
+        channel.socket().setTcpNoDelay(tcpNoDelay);
+        Reader reader = getReader();
+        try {
+          reader.startAdd();
+          SelectionKey readKey = reader.registerChannel(channel);
+          c = new Connection(readKey, channel, System.currentTimeMillis());
+          readKey.attach(c);
+          synchronized (connectionList) {
+            connectionList.add(numConnections, c);
+            numConnections++;
+          }
+          if (LOG.isDebugEnabled())
+            LOG.debug("Server connection from " + c.toString()
+                + "; # active connections: " + numConnections
+                + "; # queued calls: " + callQueue.size());
+        } finally {
+          reader.finishAdd();
+        }
+
+      }
+    }
+
+    void doRead(SelectionKey key) throws InterruptedException {
+      int count = 0;
+      Connection c = (Connection) key.attachment();
+      if (c == null) {
+        return;
+      }
+      c.setLastContact(System.currentTimeMillis());
+
+      try {
+        count = c.readAndProcess();
+      } catch (InterruptedException ieo) {
+        LOG.info(getName() + ": readAndProcess caught InterruptedException",
+            ieo);
+        throw ieo;
+      } catch (Exception e) {
+        LOG.info(getName() + ": readAndProcess threw exception " + e
+            + ". Count of bytes read: " + count, e);
+        count = -1; // so that the (count < 0) block is executed
+      }
+      if (count < 0) {
+        if (LOG.isDebugEnabled())
+          LOG.debug(getName() + ": disconnecting client " + c
+              + ". Number of active connections: " + numConnections);
+        closeConnection(c);
+        c = null;
+      } else {
+        c.setLastContact(System.currentTimeMillis());
+      }
+    }
+
+    synchronized void doStop() {
+      if (selector != null) {
+        selector.wakeup();
+        Thread.yield();
+      }
+      if (acceptChannel != null) {
+        try {
+          acceptChannel.socket().close();
+        } catch (IOException e) {
+          LOG.info(getName() + ":Exception in closing listener socket. " + e);
+        }
+      }
+      for (Reader r : readers) {
+        r.shutdown();
+      }
+    }
+
+    // The method that will return the next reader to work with
+    // Simplistic implementation of round robin for now
+    Reader getReader() {
+      currentReader = (currentReader + 1) % readers.length;
+      return readers[currentReader];
+    }
+
+  }
+
+  // Sends responses of RPC back to clients.
+  private class Responder extends Thread {
+    private Selector writeSelector;
+    private int pending; // connections waiting to register
+
+    final static int PURGE_INTERVAL = 900000; // 15mins
+
+    Responder() throws IOException {
+      this.setName("IPC Server Responder");
+      this.setDaemon(true);
+      writeSelector = Selector.open(); // create a selector
+      pending = 0;
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(Server.this);
+      long lastPurgeTime = 0; // last check for old calls.
+
+      while (running) {
+        try {
+          waitPending(); // If a channel is being registered, wait.
+          writeSelector.select(PURGE_INTERVAL);
+          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
+          while (iter.hasNext()) {
+            SelectionKey key = iter.next();
+            iter.remove();
+            try {
+              if (key.isValid() && key.isWritable()) {
+                doAsyncWrite(key);
+              }
+            } catch (IOException e) {
+              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+            }
+          }
+          long now = System.currentTimeMillis();
+          if (now < lastPurgeTime + PURGE_INTERVAL) {
+            continue;
+          }
+          lastPurgeTime = now;
+          //
+          // If there were some calls that have not been sent out for a
+          // long time, discard them.
+          //
+          LOG.debug("Checking for old call responses.");
+          ArrayList<Call> calls;
+
+          // get the list of channels from list of keys.
+          synchronized (writeSelector.keys()) {
+            calls = new ArrayList<Call>(writeSelector.keys().size());
+            iter = writeSelector.keys().iterator();
+            while (iter.hasNext()) {
+              SelectionKey key = iter.next();
+              Call call = (Call) key.attachment();
+              if (call != null && key.channel() == call.connection.channel) {
+                calls.add(call);
+              }
+            }
+          }
+
+          for (Call call : calls) {
+            try {
+              doPurge(call, now);
+            } catch (IOException e) {
+              LOG.warn("Error in purging old calls " + e);
+            }
+          }
+        } catch (OutOfMemoryError e) {
+          //
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give
+          // some thread(s) a chance to finish
+          //
+          LOG.warn("Out of Memory in server select", e);
+          try {
+            Thread.sleep(60000);
+          } catch (Exception ie) {
+          }
+        } catch (Exception e) {
+          LOG.warn("Exception in Responder "
+              + StringUtils.stringifyException(e));
+        }
+      }
+      LOG.info("Stopping " + this.getName());
+    }
+
+    private void doAsyncWrite(SelectionKey key) throws IOException {
+      Call call = (Call) key.attachment();
+      if (call == null) {
+        return;
+      }
+      if (key.channel() != call.connection.channel) {
+        throw new IOException("doAsyncWrite: bad channel");
+      }
+
+      synchronized (call.connection.responseQueue) {
+        if (processResponse(call.connection.responseQueue, false)) {
+          try {
+            key.interestOps(0);
+          } catch (CancelledKeyException e) {
+            /*
+             * The Listener/reader might have closed the socket. We don't
+             * explicitly cancel the key, so not sure if this will ever fire.
+             * This warning could be removed.
+             */
+            LOG.warn("Exception while changing ops : " + e);
+          }
+        }
+      }
+    }
+
+    //
+    // Remove calls that have been pending in the responseQueue
+    // for a long time.
+    //
+    private void doPurge(Call call, long now) throws IOException {
+      LinkedList<Call> responseQueue = call.connection.responseQueue;
+      synchronized (responseQueue) {
+        Iterator<Call> iter = responseQueue.listIterator(0);
+        while (iter.hasNext()) {
+          call = iter.next();
+          if (now > call.timestamp + PURGE_INTERVAL) {
+            closeConnection(call.connection);
+            break;
+          }
+        }
+      }
+    }
+
+    // Processes one response. Returns true if there are no more pending
+    // data for this channel.
+    //
+    private boolean processResponse(LinkedList<Call> responseQueue,
+        boolean inHandler) throws IOException {
+      boolean error = true;
+      boolean done = false; // there is more data for this channel.
+      int numElements = 0;
+      Call call = null;
+      try {
+        synchronized (responseQueue) {
+          //
+          // If there are no items for this channel, then we are done
+          //
+          numElements = responseQueue.size();
+          if (numElements == 0) {
+            error = false;
+            return true; // no more data for this channel.
+          }
+          //
+          // Extract the first call
+          //
+          call = responseQueue.removeFirst();
+          SocketChannel channel = call.connection.channel;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getName() + ": responding to #" + call.id + " from "
+                + call.connection);
+          }
+          //
+          // Send as much data as we can in the non-blocking fashion
+          //
+          int numBytes = channelWrite(channel, call.response);
+          if (numBytes < 0) {
+            return true;
+          }
+          if (!call.response.hasRemaining()) {
+            call.connection.decRpcCount();
+            if (numElements == 1) { // last call fully processes.
+              done = true; // no more data for this channel.
+            } else {
+              done = false; // more calls pending to be sent.
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getName() + ": responding to #" + call.id + " from "
+                  + call.connection + " Wrote " + numBytes + " bytes.");
+            }
+          } else {
+            //
+            // If we were unable to write the entire response out, then
+            // insert in Selector queue.
+            //
+            call.connection.responseQueue.addFirst(call);
+
+            if (inHandler) {
+              // set the serve time when the response has to be sent later
+              call.timestamp = System.currentTimeMillis();
+
+              incPending();
+              try {
+                // Wakeup the thread blocked on select, only then can the call
+                // to channel.register() complete.
+                writeSelector.wakeup();
+                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
+              } catch (ClosedChannelException e) {
+                // Its ok. channel might be closed else where.
+                done = true;
+              } finally {
+                decPending();
+              }
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getName() + ": responding to #" + call.id + " from "
+                  + call.connection + " Wrote partial " + numBytes + " bytes.");
+            }
+          }
+          error = false; // everything went off well
+        }
+      } finally {
+        if (error && call != null) {
+          LOG.warn(getName() + ", call " + call + ": output error");
+          done = true; // error. no more data for this channel.
+          closeConnection(call.connection);
+        }
+      }
+      return done;
+    }
+
+    //
+    // Enqueue a response from the application.
+    //
+    void doRespond(Call call) throws IOException {
+      synchronized (call.connection.responseQueue) {
+        call.connection.responseQueue.addLast(call);
+        if (call.connection.responseQueue.size() == 1) {
+          processResponse(call.connection.responseQueue, true);
+        }
+      }
+    }
+
+    private synchronized void incPending() { // call waiting to be enqueued.
+      pending++;
+    }
+
+    private synchronized void decPending() { // call done enqueueing.
+      pending--;
+      notify();
+    }
+
+    private synchronized void waitPending() throws InterruptedException {
+      while (pending > 0) {
+        wait();
+      }
+    }
+  }
+
+  /** Reads calls from a connection and queues them for handling. */
+  public class Connection {
+    private boolean rpcHeaderRead = false; // if initial rpc header is read
+    private boolean headerRead = false; // if the connection header that
+                                        // follows version is read.
+
+    private SocketChannel channel;
+    private ByteBuffer data;
+    private ByteBuffer dataLengthBuffer;
+    private LinkedList<Call> responseQueue;
+    private volatile int rpcCount = 0; // number of outstanding rpcs
+    private long lastContact;
+    private int dataLength;
+    private Socket socket;
+    // Cache the remote host & port info so that even if the socket is
+    // disconnected, we can say where it used to connect to.
+    private String hostAddress;
+    private int remotePort;
+    private InetAddress addr;
+
+    ConnectionHeader header = new ConnectionHeader();
+    Class<?> protocol;
+    SaslServer saslServer;
+    private AuthMethod authMethod;
+    private boolean skipInitialSaslHandshake;
+    private ByteBuffer rpcHeaderBuffer;
+
+    UserGroupInformation user = null;
+    public UserGroupInformation attemptingUser = null; // user name before auth
+
+    // Fake 'call' for failed authorization response
+    private final int AUTHROIZATION_FAILED_CALLID = -1;
+    private final Call authFailedCall = new Call(AUTHROIZATION_FAILED_CALLID,
+        null, this);
+    private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
+    // Fake 'call' for SASL context setup
+    private static final int SASL_CALLID = -33;
+    private final Call saslCall = new Call(SASL_CALLID, null, this);
+    private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
+
+    private boolean useWrap = false;
+
+    public Connection(SelectionKey key, SocketChannel channel, long lastContact) {
+      this.channel = channel;
+      this.lastContact = lastContact;
+      this.data = null;
+      this.dataLengthBuffer = ByteBuffer.allocate(4);
+      this.socket = channel.socket();
+      this.addr = socket.getInetAddress();
+      if (addr == null) {
+        this.hostAddress = "*Unknown*";
+      } else {
+        this.hostAddress = addr.getHostAddress();
+      }
+      this.remotePort = socket.getPort();
+      this.responseQueue = new LinkedList<Call>();
+      if (socketSendBufferSize != 0) {
+        try {
+          socket.setSendBufferSize(socketSendBufferSize);
+        } catch (IOException e) {
+          LOG.warn("Connection: unable to set socket send buffer size to "
+              + socketSendBufferSize);
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return getHostAddress() + ":" + remotePort;
+    }
+
+    public String getHostAddress() {
+      return hostAddress;
+    }
+
+    public InetAddress getHostInetAddress() {
+      return addr;
+    }
+
+    public void setLastContact(long lastContact) {
+      this.lastContact = lastContact;
+    }
+
+    public long getLastContact() {
+      return lastContact;
+    }
+
+    /* Return true if the connection has no outstanding rpc */
+    private boolean isIdle() {
+      return rpcCount == 0;
+    }
+
+    /* Decrement the outstanding RPC count */
+    private void decRpcCount() {
+      rpcCount--;
+    }
+
+    /* Increment the outstanding RPC count */
+    private void incRpcCount() {
+      rpcCount++;
+    }
+
+    private boolean timedOut(long currentTime) {
+      if (isIdle() && currentTime - lastContact > maxIdleTime)
+        return true;
+      return false;
+    }
+
+    private void doSaslReply(SaslStatus status, Writable rv, String errorClass,
+        String error) throws IOException {
+      saslResponse.reset();
+      DataOutputStream out = new DataOutputStream(saslResponse);
+      out.writeInt(status.state); // write status
+      if (status == SaslStatus.SUCCESS) {
+        rv.write(out);
+      } else {
+        WritableUtils.writeString(out, errorClass);
+        WritableUtils.writeString(out, error);
+      }
+      saslCall.setResponse(ByteBuffer.wrap(saslResponse.toByteArray()));
+      responder.doRespond(saslCall);
+    }
+
+    private void disposeSasl() {
+      if (saslServer != null) {
+        try {
+          saslServer.dispose();
+        } catch (SaslException ignored) {
+        }
+      }
+    }
+
+    public int readAndProcess() throws IOException, InterruptedException {
+      while (true) {
+        /*
+         * Read at most one RPC. If the header is not read completely yet then
+         * iterate until we read first RPC or until there is no data left.
+         */
+        int count = -1;
+        if (dataLengthBuffer.remaining() > 0) {
+          count = channelRead(channel, dataLengthBuffer);
+          if (count < 0 || dataLengthBuffer.remaining() > 0)
+            return count;
+        }
+
+        if (!rpcHeaderRead) {
+          // Every connection is expected to send the header.
+          if (rpcHeaderBuffer == null) {
+            rpcHeaderBuffer = ByteBuffer.allocate(2);
+          }
+          count = channelRead(channel, rpcHeaderBuffer);
+          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
+            return count;
+          }
+          int version = rpcHeaderBuffer.get(0);
+          byte[] method = new byte[] { rpcHeaderBuffer.get(1) };
+          authMethod = AuthMethod.read(new DataInputStream(
+              new ByteArrayInputStream(method)));
+          dataLengthBuffer.flip();
+          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
+            // Warning is ok since this is not supposed to happen.
+            LOG.warn("Incorrect header or version mismatch from " + hostAddress
+                + ":" + remotePort + " got version " + version
+                + " expected version " + CURRENT_VERSION);
+            return -1;
+          }
+          dataLengthBuffer.clear();
+          if (authMethod == null) {
+            throw new IOException("Unable to read authentication method");
+          }
+          if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+            AccessControlException ae = new AccessControlException(
+                "Authorization ("
+                    + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
+                    + ") is enabled but authentication ("
+                    + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+                    + ") is configured as simple. Please configure another method "
+                    + "like kerberos or digest.");
+            setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
+                null, ae.getClass().getName(), ae.getMessage());
+            responder.doRespond(authFailedCall);
+            throw ae;
+          }
+          if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+            doSaslReply(SaslStatus.SUCCESS, new IntWritable(
+                SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
+            authMethod = AuthMethod.SIMPLE;
+            // client has already sent the initial Sasl message and we
+            // should ignore it. Both client and server should fall back
+            // to simple auth from now on.
+            skipInitialSaslHandshake = true;
+          }
+
+          rpcHeaderBuffer = null;
+          rpcHeaderRead = true;
+          continue;
+        }
+
+        if (data == null) {
+          dataLengthBuffer.flip();
+          dataLength = dataLengthBuffer.getInt();
+
+          if (dataLength == Client.PING_CALL_ID) {
+            if (!useWrap) { // covers the !useSasl too
+              dataLengthBuffer.clear();
+              return 0; // ping message
+            }
+          }
+          if (dataLength < 0) {
+            LOG.warn("Unexpected data length " + dataLength + "!! from "
+                + getHostAddress());
+          }
+          data = ByteBuffer.allocate(dataLength);
+        }
+
+        count = channelRead(channel, data);
+
+        if (data.remaining() == 0) {
+          dataLengthBuffer.clear();
+          data.flip();
+          if (skipInitialSaslHandshake) {
+            data = null;
+            skipInitialSaslHandshake = false;
+            continue;
+          }
+          boolean isHeaderRead = headerRead;
+          processOneRpc(data.array());
+          data = null;
+          if (!isHeaderRead) {
+            continue;
+          }
+        }
+        return count;
+      }
+    }
+
+    // / Reads the connection header following version
+    private void processHeader(byte[] buf) throws IOException {
+      DataInputStream in = new DataInputStream(new ByteArrayInputStream(buf));
+      header.readFields(in);
+      try {
+        String protocolClassName = header.getProtocol();
+        if (protocolClassName != null) {
+          protocol = getProtocolClass(header.getProtocol(), conf);
+        }
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException("Unknown protocol: " + header.getProtocol());
+      }
+
+      UserGroupInformation protocolUser = header.getUgi();
+      user = protocolUser;
+
+      /*
+      if (user != null) {
+        user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+      }
+      */
+    }
+
+    private void processOneRpc(byte[] buf) throws IOException,
+        InterruptedException {
+      if (headerRead) {
+        processData(buf);
+      } else {
+        processHeader(buf);
+        headerRead = true;
+        if (!authorizeConnection()) {
+          throw new AccessControlException("Connection from " + this
+              + " for protocol " + header.getProtocol()
+              + " is unauthorized for user " + user);
+        }
+      }
+    }
+
+    private void processData(byte[] buf) throws IOException,
+        InterruptedException {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
+      int id = dis.readInt(); // try to read an id
+
+      if (LOG.isDebugEnabled())
+        LOG.debug(" got #" + id);
+
+      Writable param = ReflectionUtils.newInstance(paramClass, conf);// read
+                                                                     // param
+      param.readFields(dis);
+
+      Call call = new Call(id, param, this);
+      callQueue.put(call); // queue the call; maybe blocked here
+      incRpcCount(); // Increment the rpc count
+    }
+
+    private boolean authorizeConnection() throws IOException {
+      try {
+        // If auth method is DIGEST, the token was obtained by the
+        // real user for the effective user, therefore not required to
+        // authorize real user. doAs is allowed only for simple or kerberos
+        // authentication
+        if (user != null && user.getRealUser() != null
+            && (authMethod != AuthMethod.DIGEST)) {
+          ProxyUsers.authorize(user, this.getHostAddress(), conf);
+        }
+        authorize(user, header, getHostInetAddress());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Successfully authorized " + header);
+        }
+      } catch (AuthorizationException ae) {
+        setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+            ae.getClass().getName(), ae.getMessage());
+        responder.doRespond(authFailedCall);
+        return false;
+      }
+      return true;
+    }
+
+    private synchronized void close() throws IOException {
+      disposeSasl();
+      data = null;
+      dataLengthBuffer = null;
+      if (!channel.isOpen())
+        return;
+      try {
+        socket.shutdownOutput();
+      } catch (Exception e) {
+      }
+      if (channel.isOpen()) {
+        try {
+          channel.close();
+        } catch (Exception e) {
+        }
+      }
+      try {
+        socket.close();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /** Handles queued calls . */
+  private class Handler extends Thread {
+    public Handler(int instanceNumber) {
+      this.setDaemon(true);
+      this.setName("IPC Server handler " + instanceNumber + " on " + port);
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(Server.this);
+      ByteArrayOutputStream buf = new ByteArrayOutputStream(
+          INITIAL_RESP_BUF_SIZE);
+      while (running) {
+        try {
+          final Call call = callQueue.take(); // pop the queue; maybe blocked
+                                              // here
+
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + ": has #" + call.id + " from "
+                + call.connection);
+
+          String errorClass = null;
+          String error = null;
+          Writable value = null;
+
+          CurCall.set(call);
+          try {
+            // Make the call as the user via Subject.doAs, thus associating
+            // the call with the Subject
+            if (call.connection.user == null) {
+              value = call(call.connection.protocol, call.param, call.timestamp);
+            } else {
+              value = call.connection.user
+                  .doAs(new PrivilegedExceptionAction<Writable>() {
+                    @Override
+                    public Writable run() throws Exception {
+                      // make the call
+                      return call(call.connection.protocol, call.param,
+                          call.timestamp);
+
+                    }
+                  });
+            }
+          } catch (Throwable e) {
+            String logMsg = getName() + ", call " + call + ": error: " + e;
+            if (e instanceof RuntimeException || e instanceof Error) {
+              // These exception types indicate something is probably wrong
+              // on the server side, as opposed to just a normal exceptional
+              // result.
+              LOG.warn(logMsg, e);
+            } else if (exceptionsHandler.isTerse(e.getClass())) {
+              // Don't log the whole stack trace of these exceptions.
+              // Way too noisy!
+              LOG.info(logMsg);
+            } else {
+              LOG.info(logMsg, e);
+            }
+            errorClass = e.getClass().getName();
+            error = StringUtils.stringifyException(e);
+          }
+          CurCall.set(null);
+          synchronized (call.connection.responseQueue) {
+            // setupResponse() needs to be sync'ed together with
+            // responder.doResponse() since setupResponse may use
+            // SASL to encrypt response data and SASL enforces
+            // its own message ordering.
+            setupResponse(buf, call, (error == null) ? Status.SUCCESS
+                : Status.ERROR, value, errorClass, error);
+            // Discard the large buf and reset it back to
+            // smaller size to freeup heap
+            if (buf.size() > maxRespSize) {
+              LOG.warn("Large response size " + buf.size() + " for call "
+                  + call.toString());
+              buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
+            }
+            responder.doRespond(call);
+          }
+        } catch (InterruptedException e) {
+          if (running) { // unexpected -- log it
+            LOG.info(getName() + " caught: "
+                + StringUtils.stringifyException(e));
+          }
+        } catch (Exception e) {
+          LOG.info(getName() + " caught: " + StringUtils.stringifyException(e));
+        }
+      }
+      LOG.info(getName() + ": exiting");
+    }
+
+  }
+
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> paramClass, int handlerCount, Configuration conf)
+      throws IOException {
+    this(bindAddress, port, paramClass, handlerCount, conf, Integer
+        .toString(port), null);
+  }
+
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> paramClass, int handlerCount,
+      Configuration conf, String serverName) throws IOException {
+    this(bindAddress, port, paramClass, handlerCount, conf, serverName, null);
+  }
+
+  /**
+   * Constructs a server listening on the named port and address. Parameters
+   * passed must be of the named class. The
+   * <code>handlerCount</handlerCount> determines
+   * the number of handler threads that will be used to process calls.
+   * 
+   */
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> paramClass, int handlerCount,
+      Configuration conf, String serverName,
+      SecretManager<? extends TokenIdentifier> secretManager)
+      throws IOException {
+    this.bindAddress = bindAddress;
+    this.conf = conf;
+    this.port = port;
+    this.paramClass = paramClass;
+    this.handlerCount = handlerCount;
+    this.socketSendBufferSize = 0;
+    this.maxQueueSize = handlerCount
+        * conf.getInt(IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+            IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+    this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
+        IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
+    this.readThreads = conf.getInt(IPC_SERVER_RPC_READ_THREADS_KEY,
+        IPC_SERVER_RPC_READ_THREADS_DEFAULT);
+    this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
+    this.maxIdleTime = 2 * conf.getInt("ipc.client.connection.maxidletime",
+        1000);
+    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
+    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold",
+        4000);
+    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
+    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+
+    // Start the listener here and let it bind to the port
+    listener = new Listener();
+    this.port = listener.getAddress().getPort();
+    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
+
+    // Create the responder here
+    responder = new Responder();
+
+    if (isSecurityEnabled) {
+      SaslRpcServer.init(conf);
+    }
+  }
+
+  private void closeConnection(Connection connection) {
+    synchronized (connectionList) {
+      if (connectionList.remove(connection))
+        numConnections--;
+    }
+    try {
+      connection.close();
+    } catch (IOException e) {
+    }
+  }
+
+  /**
+   * Setup response for the IPC Call.
+   * 
+   * @param response buffer to serialize the response into
+   * @param call {@link Call} to which we are setting up the response
+   * @param status {@link Status} of the IPC call
+   * @param rv return value for the IPC Call, if the call was successful
+   * @param errorClass error class, if the the call failed
+   * @param error error message, if the call failed
+   * @throws IOException
+   */
+  private void setupResponse(ByteArrayOutputStream response, Call call,
+      Status status, Writable rv, String errorClass, String error)
+      throws IOException {
+    response.reset();
+    DataOutputStream out = new DataOutputStream(response);
+    out.writeInt(call.id); // write call id
+    out.writeInt(status.state); // write status
+
+    if (status == Status.SUCCESS) {
+      rv.write(out);
+    } else {
+      WritableUtils.writeString(out, errorClass);
+      WritableUtils.writeString(out, error);
+    }
+    call.setResponse(ByteBuffer.wrap(response.toByteArray()));
+  }
+
+  Configuration getConf() {
+    return conf;
+  }
+
+  /** for unit testing only, should be called before server is started */
+  void disableSecurity() {
+    this.isSecurityEnabled = false;
+  }
+
+  /** for unit testing only, should be called before server is started */
+  void enableSecurity() {
+    this.isSecurityEnabled = true;
+  }
+
+  /** Sets the socket buffer size used for responding to RPCs */
+  public void setSocketSendBufSize(int size) {
+    this.socketSendBufferSize = size;
+  }
+
+  /** Starts the service. Must be called before any calls will be handled. */
+  public synchronized void start() {
+    responder.start();
+    listener.start();
+    handlers = new Handler[handlerCount];
+
+    for (int i = 0; i < handlerCount; i++) {
+      handlers[i] = new Handler(i);
+      handlers[i].start();
+    }
+  }
+
+  /** Stops the service. No new calls will be handled after this is called. */
+  public synchronized void stop() {
+    LOG.info("Stopping server on " + port);
+    running = false;
+    if (handlers != null) {
+      for (int i = 0; i < handlerCount; i++) {
+        if (handlers[i] != null) {
+          handlers[i].interrupt();
+        }
+      }
+    }
+    listener.interrupt();
+    listener.doStop();
+    responder.interrupt();
+    notifyAll();
+  }
+
+  /**
+   * Wait for the server to be stopped. Does not wait for all subthreads to
+   * finish. See {@link #stop()}.
+   */
+  public synchronized void join() throws InterruptedException {
+    while (running) {
+      wait();
+    }
+  }
+
+  /**
+   * Return the socket (ip+port) on which the RPC server is listening to.
+   * 
+   * @return the socket (ip+port) on which the RPC server is listening to.
+   */
+  public synchronized InetSocketAddress getListenerAddress() {
+    return listener.getAddress();
+  }
+
+  /**
+   * Called for each call.
+   * 
+   * @deprecated Use {@link #call(Class, Writable, long)} instead
+   */
+  @Deprecated
+  public Writable call(Writable param, long receiveTime) throws IOException {
+    return call(null, param, receiveTime);
+  }
+
+  /** Called for each call. */
+  public abstract Writable call(Class<?> protocol, Writable param,
+      long receiveTime) throws IOException;
+
+  /**
+   * Authorize the incoming client connection.
+   * 
+   * @param user client user
+   * @param connection incoming connection
+   * @param addr InetAddress of incoming connection
+   * @throws AuthorizationException when the client isn't authorized to talk the
+   *           protocol
+   */
+  @SuppressWarnings("static-access")
+  public void authorize(UserGroupInformation user, ConnectionHeader connection,
+      InetAddress addr) throws AuthorizationException {
+    if (authorize) {
+      Class<?> protocol = null;
+      try {
+        protocol = getProtocolClass(connection.getProtocol(), getConf());
+      } catch (ClassNotFoundException cfne) {
+        throw new AuthorizationException("Unknown protocol: "
+            + connection.getProtocol());
+      }
+      ServiceAuthorizationManager authManager = new ServiceAuthorizationManager();
+      authManager.authorize(user, protocol, getConf(), addr);
+    }
+  }
+
+  /**
+   * The number of open RPC conections
+   * 
+   * @return the number of open rpc connections
+   */
+  public int getNumOpenConnections() {
+    return numConnections;
+  }
+
+  /**
+   * The number of rpc calls in the queue.
+   * 
+   * @return The number of rpc calls in the queue.
+   */
+  public int getCallQueueLen() {
+    return callQueue.size();
+  }
+
+  /**
+   * When the read or write buffer size is larger than this limit, i/o will be
+   * done in chunks of this size. Most RPC requests and responses would be be
+   * smaller.
+   */
+  private static int NIO_BUFFER_LIMIT = 8 * 1024; // should not be more than
+                                                  // 64KB.
+
+  /**
+   * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}. If
+   * the amount of data is large, it writes to channel in smaller chunks. This
+   * is to avoid jdk from creating many direct buffers as the size of buffer
+   * increases. This also minimizes extra copies in NIO layer as a result of
+   * multiple write operations required to write a large buffer.
+   * 
+   * @see WritableByteChannel#write(ByteBuffer)
+   */
+  private int channelWrite(WritableByteChannel channel, ByteBuffer buffer)
+      throws IOException {
+
+    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel
+        .write(buffer) : channelIO(null, channel, buffer);
+    return count;
+  }
+
+  /**
+   * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}. If
+   * the amount of data is large, it writes to channel in smaller chunks. This
+   * is to avoid jdk from creating many direct buffers as the size of ByteBuffer
+   * increases. There should not be any performance degredation.
+   * 
+   * @see ReadableByteChannel#read(ByteBuffer)
+   */
+  private int channelRead(ReadableByteChannel channel, ByteBuffer buffer)
+      throws IOException {
+
+    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel.read(buffer)
+        : channelIO(channel, null, buffer);
+    return count;
+  }
+
+  /**
+   * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)} and
+   * {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only one of readCh
+   * or writeCh should be non-null.
+   * 
+   * @see #channelRead(ReadableByteChannel, ByteBuffer)
+   * @see #channelWrite(WritableByteChannel, ByteBuffer)
+   */
+  private static int channelIO(ReadableByteChannel readCh,
+      WritableByteChannel writeCh, ByteBuffer buf) throws IOException {
+
+    int originalLimit = buf.limit();
+    int initialRemaining = buf.remaining();
+    int ret = 0;
+
+    while (buf.remaining() > 0) {
+      try {
+        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+        buf.limit(buf.position() + ioSize);
+
+        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
+
+        if (ret < ioSize) {
+          break;
+        }
+
+      } finally {
+        buf.limit(originalLimit);
+      }
+    }
+
+    int nBytes = initialRemaining - buf.remaining();
+    return (nBytes > 0) ? nBytes : ret;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/Status.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/Status.java?rev=1514580&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/Status.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/Status.java Fri Aug 16 05:20:15 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+/**
+ * Status of a Hadoop IPC call.
+ */
+enum Status {
+  SUCCESS (0),
+  ERROR (1),
+  FATAL (-1);
+  
+  int state;
+  private Status(int state) {
+    this.state = state;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/VersionedProtocol.java?rev=1514580&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/VersionedProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/VersionedProtocol.java Fri Aug 16 05:20:15 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+/**
+ * Superclass of all protocols that use Hadoop RPC.
+ * Subclasses of this interface are also supposed to have
+ * a static final long versionID field.
+ */
+public interface VersionedProtocol {
+  
+  /**
+   * Return protocol version corresponding to protocol interface.
+   * @param protocol The classname of the protocol interface
+   * @param clientVersion The version of the protocol that the client speaks
+   * @return the version that the server will speak
+   */
+  public long getProtocolVersion(String protocol, 
+                                 long clientVersion) throws IOException;
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java?rev=1514580&r1=1514579&r2=1514580&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java Fri Aug 16 05:20:15 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.util.DistCacheUtils;
 
 public class DistributedCacheUtil {
 
@@ -79,7 +80,10 @@ public class DistributedCacheUtil {
       }
     }
     if (files.length() > 0) {
-      DistributedCache.addLocalFiles(conf, files.toString());
+      // I've replaced the use of the missing setLocalFiles and
+      // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils methods
+      // which set the cache configurations directly.
+      DistCacheUtils.addLocalFiles(conf, files.toString());
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java?rev=1514580&r1=1514579&r2=1514580&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java Fri Aug 16 05:20:15 2013
@@ -18,21 +18,44 @@
 package org.apache.hama.util;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ConnectException;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
 import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hama.Constants;
+import org.apache.hama.ipc.Server;
 
 /**
  * NetUtils for our needs.
  */
 public class BSPNetUtils {
+  public static final Log LOG = LogFactory.getLog(BSPNetUtils.class);
+
   public static final int MAX_PORT_NUMBER = 65535;
 
+  private static Map<String, String> hostToResolved = new HashMap<String, String>();
+
   /**
    * Gets the canonical hostname of this machine.
    * 
@@ -136,4 +159,381 @@ public class BSPNetUtils {
     throw new NoSuchElementException("Could not find an available port "
         + "above " + fromPort);
   }
+
+  /**
+   * Returns InetSocketAddress that a client can use to connect to the server.
+   * Server.getListenerAddress() is not correct when the server binds to
+   * "0.0.0.0". This returns "127.0.0.1:port" when the getListenerAddress()
+   * returns "0.0.0.0:port".
+   * 
+   * @param server
+   * @return socket address that a client can use to connect to the server.
+   */
+  public static InetSocketAddress getConnectAddress(Server server) {
+    InetSocketAddress addr = server.getListenerAddress();
+    if (addr.getAddress().isAnyLocalAddress()) {
+      addr = makeSocketAddr("127.0.0.1", addr.getPort());
+    }
+    return addr;
+  }
+
+  /**
+   * Create a socket address with the given host and port. The hostname might be
+   * replaced with another host that was set via
+   * {@link #addStaticResolution(String, String)}. The value of
+   * hadoop.security.token.service.use_ip will determine whether the standard
+   * java host resolver is used, or if the fully qualified resolver is used.
+   * 
+   * @param host the hostname or IP use to instantiate the object
+   * @param port the port number
+   * @return InetSocketAddress
+   */
+  public static InetSocketAddress makeSocketAddr(String host, int port) {
+    String staticHost = getStaticResolution(host);
+    String resolveHost = (staticHost != null) ? staticHost : host;
+
+    InetSocketAddress addr;
+    try {
+      InetAddress iaddr = SecurityUtil.getByName(resolveHost);
+      // if there is a static entry for the host, make the returned
+      // address look like the original given host
+      if (staticHost != null) {
+        iaddr = InetAddress.getByAddress(host, iaddr.getAddress());
+      }
+      addr = new InetSocketAddress(iaddr, port);
+    } catch (UnknownHostException e) {
+      addr = InetSocketAddress.createUnresolved(host, port);
+    }
+    return addr;
+  }
+
+  /**
+   * Retrieves the resolved name for the passed host. The resolved name must
+   * have been set earlier using
+   * {@link NetUtils#addStaticResolution(String, String)}
+   * 
+   * @param host
+   * @return the resolution
+   */
+  public static String getStaticResolution(String host) {
+    synchronized (hostToResolved) {
+      return hostToResolved.get(host);
+    }
+  }
+
+  /**
+   * Handle the transition from pairs of attributes specifying a host and port
+   * to a single colon separated one.
+   * 
+   * @param conf the configuration to check
+   * @param oldBindAddressName the old address attribute name
+   * @param oldPortName the old port attribute name
+   * @param newBindAddressName the new combined name
+   * @return the complete address from the configuration
+   */
+  public static String getServerAddress(Configuration conf,
+      String oldBindAddressName, String oldPortName, String newBindAddressName) {
+    String oldAddr = conf.get(oldBindAddressName);
+    String oldPort = conf.get(oldPortName);
+    String newAddrPort = conf.get(newBindAddressName);
+    if (oldAddr == null && oldPort == null) {
+      return newAddrPort;
+    }
+    String[] newAddrPortParts = newAddrPort.split(":", 2);
+    if (newAddrPortParts.length != 2) {
+      throw new IllegalArgumentException("Invalid address/port: " + newAddrPort);
+    }
+    if (oldAddr == null) {
+      oldAddr = newAddrPortParts[0];
+    } else {
+      LOG.warn("Configuration parameter " + oldBindAddressName
+          + " is deprecated. Use " + newBindAddressName + " instead.");
+    }
+    if (oldPort == null) {
+      oldPort = newAddrPortParts[1];
+    } else {
+      LOG.warn("Configuration parameter " + oldPortName
+          + " is deprecated. Use " + newBindAddressName + " instead.");
+    }
+    return oldAddr + ":" + oldPort;
+  }
+
+  /**
+   * Util method to build socket addr from either: <host>:<post>
+   * <fs>://<host>:<port>/<path>
+   */
+  public static InetSocketAddress createSocketAddr(String target) {
+    return createSocketAddr(target, -1);
+  }
+
+  /**
+   * Util method to build socket addr from either: <host> <host>:<post>
+   * <fs>://<host>:<port>/<path>
+   */
+  public static InetSocketAddress createSocketAddr(String target,
+      int defaultPort) {
+    if (target == null) {
+      throw new IllegalArgumentException("Socket address is null");
+    }
+    boolean hasScheme = target.contains("://");
+    URI uri = null;
+    try {
+      uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"
+          + target);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(
+          "Does not contain a valid host:port authority: " + target);
+    }
+
+    String host = uri.getHost();
+    int port = uri.getPort();
+    if (port == -1) {
+      port = defaultPort;
+    }
+    String path = uri.getPath();
+
+    if ((host == null) || (port < 0)
+        || (!hasScheme && path != null && !path.isEmpty())) {
+      throw new IllegalArgumentException(
+          "Does not contain a valid host:port authority: " + target);
+    }
+    return makeSocketAddr(host, port);
+  }
+
+  /**
+   * Checks if {@code host} is a local host name and return {@link InetAddress}
+   * corresponding to that address.
+   * 
+   * @param host the specified host
+   * @return a valid local {@link InetAddress} or null
+   * @throws SocketException if an I/O error occurs
+   */
+  public static InetAddress getLocalInetAddress(String host)
+      throws SocketException {
+    if (host == null) {
+      return null;
+    }
+    InetAddress addr = null;
+    try {
+      addr = InetAddress.getByName(host);
+      if (NetworkInterface.getByInetAddress(addr) == null) {
+        addr = null; // Not a local address
+      }
+    } catch (UnknownHostException ignore) {
+    }
+    return addr;
+  }
+
+  /**
+   * This is a drop-in replacement for
+   * {@link Socket#connect(SocketAddress, int)}. In the case of normal sockets
+   * that don't have associated channels, this just invokes
+   * <code>socket.connect(endpoint, timeout)</code>. If
+   * <code>socket.getChannel()</code> returns a non-null channel, connect is
+   * implemented using Hadoop's selectors. This is done mainly to avoid Sun's
+   * connect implementation from creating thread-local selectors, since Hadoop
+   * does not have control on when these are closed and could end up taking all
+   * the available file descriptors.
+   * 
+   * @see java.net.Socket#connect(java.net.SocketAddress, int)
+   * 
+   * @param socket
+   * @param address the remote address
+   * @param timeout timeout in milliseconds
+   */
+  public static void connect(Socket socket, SocketAddress address, int timeout)
+      throws IOException {
+    connect(socket, address, null, timeout);
+  }
+
+  /**
+   * Like {@link NetUtils#connect(Socket, SocketAddress, int)} but also takes a
+   * local address and port to bind the socket to.
+   * 
+   * @param socket
+   * @param endpoint the remote address
+   * @param localAddr the local address to bind the socket to
+   * @param timeout timeout in milliseconds
+   */
+  public static void connect(Socket socket, SocketAddress endpoint,
+      SocketAddress localAddr, int timeout) throws IOException {
+    if (socket == null || endpoint == null || timeout < 0) {
+      throw new IllegalArgumentException("Illegal argument for connect()");
+    }
+
+    SocketChannel ch = socket.getChannel();
+
+    if (localAddr != null) {
+      socket.bind(localAddr);
+    }
+
+    if (ch == null) {
+      // let the default implementation handle it.
+      socket.connect(endpoint, timeout);
+    } else {
+      SocketIOWithTimeout.connect(ch, endpoint, timeout);
+    }
+
+    // There is a very rare case allowed by the TCP specification, such that
+    // if we are trying to connect to an endpoint on the local machine,
+    // and we end up choosing an ephemeral port equal to the destination port,
+    // we will actually end up getting connected to ourself (ie any data we
+    // send just comes right back). This is only possible if the target
+    // daemon is down, so we'll treat it like connection refused.
+    if (socket.getLocalPort() == socket.getPort()
+        && socket.getLocalAddress().equals(socket.getInetAddress())) {
+      LOG.info("Detected a loopback TCP socket, disconnecting it");
+      socket.close();
+      throw new ConnectException(
+          "Localhost targeted connection resulted in a loopback. "
+              + "No daemon is listening on the target port.");
+    }
+  }
+
+  /**
+   * Same as getInputStream(socket, socket.getSoTimeout()).<br>
+   * <br>
+   * 
+   * From documentation for {@link #getInputStream(Socket, long)}:<br>
+   * Returns InputStream for the socket. If the socket has an associated
+   * SocketChannel then it returns a {@link SocketInputStream} with the given
+   * timeout. If the socket does not have a channel,
+   * {@link Socket#getInputStream()} is returned. In the later case, the timeout
+   * argument is ignored and the timeout set with
+   * {@link Socket#setSoTimeout(int)} applies for reads.<br>
+   * <br>
+   * 
+   * Any socket created using socket factories returned by {@link NetUtils},
+   * must use this interface instead of {@link Socket#getInputStream()}.
+   * 
+   * @see #getInputStream(Socket, long)
+   * 
+   * @param socket
+   * @return InputStream for reading from the socket.
+   * @throws IOException
+   */
+  public static InputStream getInputStream(Socket socket) throws IOException {
+    return getInputStream(socket, socket.getSoTimeout());
+  }
+
+  /**
+   * Returns InputStream for the socket. If the socket has an associated
+   * SocketChannel then it returns a {@link SocketInputStream} with the given
+   * timeout. If the socket does not have a channel,
+   * {@link Socket#getInputStream()} is returned. In the later case, the timeout
+   * argument is ignored and the timeout set with
+   * {@link Socket#setSoTimeout(int)} applies for reads.<br>
+   * <br>
+   * 
+   * Any socket created using socket factories returned by {@link NetUtils},
+   * must use this interface instead of {@link Socket#getInputStream()}.
+   * 
+   * @see Socket#getChannel()
+   * 
+   * @param socket
+   * @param timeout timeout in milliseconds. This may not always apply. zero for
+   *          waiting as long as necessary.
+   * @return InputStream for reading from the socket.
+   * @throws IOException
+   */
+  @SuppressWarnings("resource")
+  public static InputStream getInputStream(Socket socket, long timeout)
+      throws IOException {
+    return (socket.getChannel() == null) ? socket.getInputStream()
+        : new SocketInputStream(socket, timeout);
+  }
+
+  /**
+   * Same as getOutputStream(socket, 0). Timeout of zero implies write will wait
+   * until data is available.<br>
+   * <br>
+   * 
+   * From documentation for {@link #getOutputStream(Socket, long)} : <br>
+   * Returns OutputStream for the socket. If the socket has an associated
+   * SocketChannel then it returns a {@link SocketOutputStream} with the given
+   * timeout. If the socket does not have a channel,
+   * {@link Socket#getOutputStream()} is returned. In the later case, the
+   * timeout argument is ignored and the write will wait until data is
+   * available.<br>
+   * <br>
+   * 
+   * Any socket created using socket factories returned by {@link NetUtils},
+   * must use this interface instead of {@link Socket#getOutputStream()}.
+   * 
+   * @see #getOutputStream(Socket, long)
+   * 
+   * @param socket
+   * @return OutputStream for writing to the socket.
+   * @throws IOException
+   */
+  public static OutputStream getOutputStream(Socket socket) throws IOException {
+    return getOutputStream(socket, 0);
+  }
+
+  /**
+   * Returns OutputStream for the socket. If the socket has an associated
+   * SocketChannel then it returns a {@link SocketOutputStream} with the given
+   * timeout. If the socket does not have a channel,
+   * {@link Socket#getOutputStream()} is returned. In the later case, the
+   * timeout argument is ignored and the write will wait until data is
+   * available.<br>
+   * <br>
+   * 
+   * Any socket created using socket factories returned by {@link NetUtils},
+   * must use this interface instead of {@link Socket#getOutputStream()}.
+   * 
+   * @see Socket#getChannel()
+   * 
+   * @param socket
+   * @param timeout timeout in milliseconds. This may not always apply. zero for
+   *          waiting as long as necessary.
+   * @return OutputStream for writing to the socket.
+   * @throws IOException
+   */
+  @SuppressWarnings("resource")
+  public static OutputStream getOutputStream(Socket socket, long timeout)
+      throws IOException {
+    return (socket.getChannel() == null) ? socket.getOutputStream()
+        : new SocketOutputStream(socket, timeout);
+  }
+
+  /**
+   * Get the default socket factory as specified by the configuration parameter
+   * <tt>hadoop.rpc.socket.factory.default</tt>
+   * 
+   * @param conf the configuration
+   * @return the default socket factory as specified in the configuration or the
+   *         JVM default socket factory if the configuration does not contain a
+   *         default socket factory property.
+   */
+  public static SocketFactory getDefaultSocketFactory(Configuration conf) {
+
+    String propValue = conf.get("hadoop.rpc.socket.factory.class.default");
+    if ((propValue == null) || (propValue.length() == 0))
+      return SocketFactory.getDefault();
+
+    return getSocketFactoryFromProperty(conf, propValue);
+  }
+
+  /**
+   * Get the socket factory corresponding to the given proxy URI. If the given
+   * proxy URI corresponds to an absence of configuration parameter, returns
+   * null. If the URI is malformed raises an exception.
+   * 
+   * @param propValue the property which is the class name of the SocketFactory
+   *          to instantiate; assumed non null and non empty.
+   * @return a socket factory as defined in the property value.
+   */
+  public static SocketFactory getSocketFactoryFromProperty(Configuration conf,
+      String propValue) {
+
+    try {
+      Class<?> theClass = conf.getClassByName(propValue);
+      return (SocketFactory) org.apache.hadoop.util.ReflectionUtils
+          .newInstance(theClass, conf);
+
+    } catch (ClassNotFoundException cnfe) {
+      throw new RuntimeException("Socket Factory class not found: " + cnfe);
+    }
+  }
 }

Added: hama/trunk/core/src/main/java/org/apache/hama/util/DistCacheUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/DistCacheUtils.java?rev=1514580&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/DistCacheUtils.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/DistCacheUtils.java Fri Aug 16 05:20:15 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class DistCacheUtils {
+  
+  private static final String CACHE_LOCALFILES = "mapred.cache.localFiles";
+  
+  public static void addLocalFiles(Configuration conf, String str) {
+     String files = conf.get(CACHE_LOCALFILES);
+     conf.set(CACHE_LOCALFILES, files == null ? str
+         : files + "," + str);
+   }
+
+  public static void setLocalFiles(Configuration conf, String str) {
+    conf.set(CACHE_LOCALFILES, str);
+  }
+  
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java?rev=1514580&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java Fri Aug 16 05:20:15 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.TreeMap;
+
+/** A driver that is used to run programs added to it
+ */
+
+public class ProgramDriver {
+    
+  /**
+   * A description of a program based on its class and a 
+   * human-readable description.
+   * @date april 2006
+   */
+  Map<String, ProgramDescription> programs;
+     
+  public ProgramDriver(){
+    programs = new TreeMap<String, ProgramDescription>();
+  }
+     
+  static private class ProgramDescription {
+        
+    static final Class<?>[] paramTypes = new Class<?>[] {String[].class};
+        
+    /**
+     * Create a description of an example program.
+     * @param mainClass the class with the main for the example program
+     * @param description a string to display to the user in help messages
+     * @throws SecurityException if we can't use reflection
+     * @throws NoSuchMethodException if the class doesn't have a main method
+     */
+    public ProgramDescription(Class<?> mainClass, 
+                              String description)
+      throws SecurityException, NoSuchMethodException {
+      this.main = mainClass.getMethod("main", paramTypes);
+      this.description = description;
+    }
+        
+    /**
+     * Invoke the example application with the given arguments
+     * @param args the arguments for the application
+     * @throws Throwable The exception thrown by the invoked method
+     */
+    public void invoke(String[] args)
+      throws Throwable {
+      try {
+        main.invoke(null, new Object[]{args});
+      } catch (InvocationTargetException except) {
+        throw except.getCause();
+      }
+    }
+        
+    public String getDescription() {
+      return description;
+    }
+        
+    private Method main;
+    private String description;
+  }
+    
+  private static void printUsage(Map<String, ProgramDescription> programs) {
+    System.out.println("Valid program names are:");
+    for(Map.Entry<String, ProgramDescription> item : programs.entrySet()) {
+      System.out.println("  " + item.getKey() + ": " +
+                         item.getValue().getDescription());         
+    } 
+  }
+    
+  /**
+   * This is the method that adds the classed to the repository
+   * @param name The name of the string you want the class instance to be called with
+   * @param mainClass The class that you want to add to the repository
+   * @param description The description of the class
+   * @throws NoSuchMethodException 
+   * @throws SecurityException 
+   */
+  public void addClass (String name, Class mainClass, String description) throws Throwable {
+    programs.put(name , new ProgramDescription(mainClass, description));
+  }
+    
+  /**
+   * This is a driver for the example programs.
+   * It looks at the first command line argument and tries to find an
+   * example program with that name.
+   * If it is found, it calls the main method in that class with the rest 
+   * of the command line arguments.
+   * @param args The argument from the user. args[0] is the command to run.
+   * @throws NoSuchMethodException 
+   * @throws SecurityException 
+   * @throws IllegalAccessException 
+   * @throws IllegalArgumentException 
+   * @throws Throwable Anything thrown by the example program's main
+   */
+  public void driver(String[] args) 
+    throws Throwable 
+  {
+    // Make sure they gave us a program name.
+    if (args.length == 0) {
+      System.out.println("An example program must be given as the" + 
+                         " first argument.");
+      printUsage(programs);
+      System.exit(-1);
+    }
+        
+    // And that it is good.
+    ProgramDescription pgm = programs.get(args[0]);
+    if (pgm == null) {
+      System.out.println("Unknown program '" + args[0] + "' chosen.");
+      printUsage(programs);
+      System.exit(-1);
+    }
+        
+    // Remove the leading argument and call main
+    String[] new_args = new String[args.length - 1];
+    for(int i=1; i < args.length; ++i) {
+      new_args[i-1] = args[i];
+    }
+    pgm.invoke(new_args);
+  }
+    
+}
\ No newline at end of file

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java?rev=1514580&r1=1514579&r2=1514580&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java Fri Aug 16 05:20:15 2013
@@ -116,7 +116,7 @@ public class RunJar {
       public void run() {
         try {
           FileUtil.fullyDelete(workDir);
-        } catch (IOException e) {
+        } catch (Exception e) {
         }
       }
     });



Mime
View raw message