hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r724238 [2/3] - in /hadoop/hbase/branches/0.19_on_hadoop_0.18: ./ conf/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apach...
Date Mon, 08 Dec 2008 03:20:44 GMT
Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=724238&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sun Dec  7 19:20:43 2008
@@ -0,0 +1,1061 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/** An abstract IPC service.  IPC calls take a single {@link Writable} as a
+ * parameter, and return a {@link Writable} as their value.  A service runs on
+ * a port and is defined by a parameter class and a value class.
+ * 
+ * 
+ * <p>Copied local so can fix HBASE-900.
+ * 
+ * @see HBaseClient
+ */
+public abstract class HBaseServer {
+  
+  /**
+   * The first four bytes of Hadoop RPC connections
+   */
+  public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+  
+  // 1 : Introduce ping and server does not throw away RPCs
+  public static final byte CURRENT_VERSION = 2;
+  
+  /**
+   * How many calls/handler are allowed in the queue.
+   */
+  private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
+  
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
+
+  private static final ThreadLocal<HBaseServer> SERVER = new ThreadLocal<HBaseServer>();
+
+  /** Returns the server instance called under or null.  May be called under
+   * {@link #call(Writable, long)} implementations, and under {@link Writable}
+   * methods of paramters and return values.  Permits applications to access
+   * the server context.*/
+  public static HBaseServer get() {
+    return SERVER.get();
+  }
+ 
+  /** This is set to Call object before Handler invokes an RPC and reset
+   * after the call returns.
+   */
+  private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+  
+  /** Returns the remote side ip address when invoked inside an RPC 
+   *  Returns null incase of an error.
+   */
+  public static InetAddress getRemoteIp() {
+    Call call = CurCall.get();
+    if (call != null) {
+      return call.connection.socket.getInetAddress();
+    }
+    return null;
+  }
+  /** Returns remote address as a string when invoked inside an RPC.
+   *  Returns null in case of an error.
+   */
+  public static String getRemoteAddress() {
+    InetAddress addr = getRemoteIp();
+    return (addr == null) ? null : addr.getHostAddress();
+  }
+
+  private String bindAddress; 
+  private int port;                               // port we listen on
+  private int handlerCount;                       // number of handler threads
+  private Class<? extends Writable> paramClass;   // class of call parameters
+  private int maxIdleTime;                        // the maximum idle time after 
+                                                  // which a client may be disconnected
+  private int thresholdIdleConnections;           // the number of idle connections
+                                                  // after which we will start
+                                                  // cleaning up idle 
+                                                  // connections
+  int maxConnectionsToNuke;                       // the max number of 
+                                                  // connections to nuke
+                                                  //during a cleanup
+  
+  protected HBaseRpcMetrics  rpcMetrics;
+  
+  private Configuration conf;
+
+  private int maxQueueSize;
+  private int socketSendBufferSize;
+  private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+
+  volatile private boolean running = true;         // true while server runs
+  private BlockingQueue<Call> callQueue; // queued calls
+
+  private List<Connection> connectionList = 
+    Collections.synchronizedList(new LinkedList<Connection>());
+  //maintain a list
+  //of client connections
+  private Listener listener = null;
+  private Responder responder = null;
+  private int numConnections = 0;
+  private Handler[] handlers = null;
+
+  /**
+   * A convenience method to bind to a given address and report 
+   * better exceptions if the address is not a valid host.
+   * @param socket the socket to bind
+   * @param address the address to bind to
+   * @param backlog the number of connections allowed in the queue
+   * @throws BindException if the address can't be bound
+   * @throws UnknownHostException if the address isn't a valid host name
+   * @throws IOException other random errors from bind
+   */
+  public static void bind(ServerSocket socket, InetSocketAddress address, 
+                          int backlog) throws IOException {
+    try {
+      socket.bind(address, backlog);
+    } catch (BindException e) {
+      BindException bindException = new BindException("Problem binding to " + address
+                                                      + " : " + e.getMessage());
+      bindException.initCause(e);
+      throw bindException;
+    } catch (SocketException e) {
+      // If they try to bind to a different host's address, give a better
+      // error message.
+      if ("Unresolved address".equals(e.getMessage())) {
+        throw new UnknownHostException("Invalid hostname for server: " + 
+                                       address.getHostName());
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /** A call queued for handling. */
+  private static class Call {
+    private int id;                               // the client's call id
+    private Writable param;                       // the parameter passed
+    private Connection connection;                // connection to client
+    private long timestamp;     // the time received when response is null
+                                   // the time served when response is not null
+    private ByteBuffer response;                      // the response for this call
+
+    public Call(int id, Writable param, Connection connection) {
+      this.id = id;
+      this.param = param;
+      this.connection = connection;
+      this.timestamp = System.currentTimeMillis();
+      this.response = null;
+    }
+    
+    @Override
+    public String toString() {
+      return param.toString() + " from " + connection.toString();
+    }
+
+    public void setResponse(ByteBuffer response) {
+      this.response = response;
+    }
+  }
+
+  /** Listens on the socket. Creates jobs for the handler threads*/
+  private class Listener extends Thread {
+    
+    private ServerSocketChannel acceptChannel = null; //the accept channel
+    private Selector selector = null; //the selector that we use for the server
+    private InetSocketAddress address; //the address we bind at
+    private Random rand = new Random();
+    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
+                                         //-tion (for idle connections) ran
+    private long cleanupInterval = 10000; //the minimum interval between 
+                                          //two cleanup runs
+    private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
+    
+    public Listener() throws IOException {
+      address = new InetSocketAddress(bindAddress, port);
+      // Create a new server socket and set to non blocking mode
+      acceptChannel = ServerSocketChannel.open();
+      acceptChannel.configureBlocking(false);
+
+      // Bind the server socket to the local host and port
+      bind(acceptChannel.socket(), address, backlogLength);
+      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
+      // create a selector;
+      selector= Selector.open();
+
+      // Register accepts on the server socket with the selector.
+      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
+      this.setName("IPC Server listener on " + port);
+      this.setDaemon(true);
+    }
+    /** cleanup connections from connectionList. Choose a random range
+     * to scan and also have a limit on the number of the connections
+     * that will be cleanedup per run. The criteria for cleanup is the time
+     * for which the connection was idle. If 'force' is true then all 
+     * connections will be looked at for the cleanup.
+     */
+    private void cleanupConnections(boolean force) {
+      if (force || numConnections > thresholdIdleConnections) {
+        long currentTime = System.currentTimeMillis();
+        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
+          return;
+        }
+        int start = 0;
+        int end = numConnections - 1;
+        if (!force) {
+          start = rand.nextInt() % numConnections;
+          end = rand.nextInt() % numConnections;
+          int temp;
+          if (end < start) {
+            temp = start;
+            start = end;
+            end = temp;
+          }
+        }
+        int i = start;
+        int numNuked = 0;
+        while (i <= end) {
+          Connection c;
+          synchronized (connectionList) {
+            try {
+              c = connectionList.get(i);
+            } catch (Exception e) {return;}
+          }
+          if (c.timedOut(currentTime)) {
+            if (LOG.isDebugEnabled())
+              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+            closeConnection(c);
+            numNuked++;
+            end--;
+            c = null;
+            if (!force && numNuked == maxConnectionsToNuke) break;
+          }
+          else i++;
+        }
+        lastCleanupRunTime = System.currentTimeMillis();
+      }
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(HBaseServer.this);
+      while (running) {
+        SelectionKey key = null;
+        try {
+          selector.select();
+          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
+          while (iter.hasNext()) {
+            key = iter.next();
+            iter.remove();
+            try {
+              if (key.isValid()) {
+                if (key.isAcceptable())
+                  doAccept(key);
+                else if (key.isReadable())
+                  doRead(key);
+              }
+            } catch (IOException e) {
+            }
+            key = null;
+          }
+        } catch (OutOfMemoryError e) {
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give 
+          // some thread(s) a chance to finish
+          LOG.warn("Out of Memory in server select", e);
+          closeCurrentConnection(key, e);
+          cleanupConnections(true);
+          try { Thread.sleep(60000); } catch (Exception ie) {}
+        } catch (InterruptedException e) {
+          if (running) {                          // unexpected -- log it
+            LOG.info(getName() + " caught: " +
+                     StringUtils.stringifyException(e));
+          }
+        } catch (Exception e) {
+          closeCurrentConnection(key, e);
+        }
+        cleanupConnections(false);
+      }
+      LOG.info("Stopping " + this.getName());
+
+      synchronized (this) {
+        try {
+          acceptChannel.close();
+          selector.close();
+        } catch (IOException e) { }
+
+        selector= null;
+        acceptChannel= null;
+        
+        // clean up all connections
+        while (!connectionList.isEmpty()) {
+          closeConnection(connectionList.remove(0));
+        }
+      }
+    }
+
+    private void closeCurrentConnection(SelectionKey key, Throwable e) {
+      if (key != null) {
+        Connection c = (Connection)key.attachment();
+        if (c != null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+          closeConnection(c);
+          c = null;
+        }
+      }
+    }
+
+    InetSocketAddress getAddress() {
+      return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
+    }
+    
+    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
+      Connection c = null;
+      ServerSocketChannel server = (ServerSocketChannel) key.channel();
+      // accept up to 10 connections
+      for (int i=0; i<10; i++) {
+        SocketChannel channel = server.accept();
+        if (channel==null) return;
+
+        channel.configureBlocking(false);
+        channel.socket().setTcpNoDelay(tcpNoDelay);
+        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
+        c = new Connection(readKey, channel, System.currentTimeMillis());
+        readKey.attach(c);
+        synchronized (connectionList) {
+          connectionList.add(numConnections, c);
+          numConnections++;
+        }
+        if (LOG.isDebugEnabled())
+          LOG.debug("Server connection from " + c.toString() +
+              "; # active connections: " + numConnections +
+              "; # queued calls: " + callQueue.size());
+      }
+    }
+
+    void doRead(SelectionKey key) throws InterruptedException {
+      int count = 0;
+      Connection c = (Connection)key.attachment();
+      if (c == null) {
+        return;  
+      }
+      c.setLastContact(System.currentTimeMillis());
+      
+      try {
+        count = c.readAndProcess();
+      } catch (InterruptedException ieo) {
+        throw ieo;
+      } catch (Exception e) {
+        LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
+        count = -1; //so that the (count < 0) block is executed
+      }
+      if (count < 0) {
+        if (LOG.isDebugEnabled())
+          LOG.debug(getName() + ": disconnecting client " + 
+                    c.getHostAddress() + ". Number of active connections: "+
+                    numConnections);
+        closeConnection(c);
+        c = null;
+      }
+      else {
+        c.setLastContact(System.currentTimeMillis());
+      }
+    }   
+
+    synchronized void doStop() {
+      if (selector != null) {
+        selector.wakeup();
+        Thread.yield();
+      }
+      if (acceptChannel != null) {
+        try {
+          acceptChannel.socket().close();
+        } catch (IOException e) {
+          LOG.info(getName() + ":Exception in closing listener socket. " + e);
+        }
+      }
+    }
+  }
+
+  // Sends responses of RPC back to clients.
+  private class Responder extends Thread {
+    private Selector writeSelector;
+    private int pending;         // connections waiting to register
+    
+    final static int PURGE_INTERVAL = 900000; // 15mins
+
+    Responder() throws IOException {
+      this.setName("IPC Server Responder");
+      this.setDaemon(true);
+      writeSelector = Selector.open(); // create a selector
+      pending = 0;
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(HBaseServer.this);
+      long lastPurgeTime = 0;   // last check for old calls.
+
+      while (running) {
+        try {
+          waitPending();     // If a channel is being registered, wait.
+          writeSelector.select(PURGE_INTERVAL);
+          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
+          while (iter.hasNext()) {
+            SelectionKey key = iter.next();
+            iter.remove();
+            try {
+              if (key.isValid() && key.isWritable()) {
+                  doAsyncWrite(key);
+              }
+            } catch (IOException e) {
+              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+            }
+          }
+          long now = System.currentTimeMillis();
+          if (now < lastPurgeTime + PURGE_INTERVAL) {
+            continue;
+          }
+          lastPurgeTime = now;
+          //
+          // If there were some calls that have not been sent out for a
+          // long time, discard them.
+          //
+          LOG.debug("Checking for old call responses.");
+          ArrayList<Call> calls;
+          
+          // get the list of channels from list of keys.
+          synchronized (writeSelector.keys()) {
+            calls = new ArrayList<Call>(writeSelector.keys().size());
+            iter = writeSelector.keys().iterator();
+            while (iter.hasNext()) {
+              SelectionKey key = iter.next();
+              Call call = (Call)key.attachment();
+              if (call != null && key.channel() == call.connection.channel) { 
+                calls.add(call);
+              }
+            }
+          }
+          
+          for(Call call : calls) {
+            try {
+              doPurge(call, now);
+            } catch (IOException e) {
+              LOG.warn("Error in purging old calls " + e);
+            }
+          }
+        } catch (OutOfMemoryError e) {
+          //
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give
+          // some thread(s) a chance to finish
+          //
+          LOG.warn("Out of Memory in server select", e);
+          try { Thread.sleep(60000); } catch (Exception ie) {}
+        } catch (Exception e) {
+          LOG.warn("Exception in Responder " + 
+                   StringUtils.stringifyException(e));
+        }
+      }
+      LOG.info("Stopping " + this.getName());
+    }
+
+    private void doAsyncWrite(SelectionKey key) throws IOException {
+      Call call = (Call)key.attachment();
+      if (call == null) {
+        return;
+      }
+      if (key.channel() != call.connection.channel) {
+        throw new IOException("doAsyncWrite: bad channel");
+      }
+
+      synchronized(call.connection.responseQueue) {
+        if (processResponse(call.connection.responseQueue, false)) {
+          try {
+            key.interestOps(0);
+          } catch (CancelledKeyException e) {
+            /* The Listener/reader might have closed the socket.
+             * We don't explicitly cancel the key, so not sure if this will
+             * ever fire.
+             * This warning could be removed.
+             */
+            LOG.warn("Exception while changing ops : " + e);
+          }
+        }
+      }
+    }
+
+    //
+    // Remove calls that have been pending in the responseQueue 
+    // for a long time.
+    //
+    private void doPurge(Call call, long now) throws IOException {
+      LinkedList<Call> responseQueue = call.connection.responseQueue;
+      synchronized (responseQueue) {
+        Iterator<Call> iter = responseQueue.listIterator(0);
+        while (iter.hasNext()) {
+          call = iter.next();
+          if (now > call.timestamp + PURGE_INTERVAL) {
+            closeConnection(call.connection);
+            break;
+          }
+        }
+      }
+    }
+
+    // Processes one response. Returns true if there are no more pending
+    // data for this channel.
+    //
+    private boolean processResponse(LinkedList<Call> responseQueue,
+                                    boolean inHandler) throws IOException {
+      boolean error = true;
+      boolean done = false;       // there is more data for this channel.
+      int numElements = 0;
+      Call call = null;
+      try {
+        synchronized (responseQueue) {
+          //
+          // If there are no items for this channel, then we are done
+          //
+          numElements = responseQueue.size();
+          if (numElements == 0) {
+            error = false;
+            return true;              // no more data for this channel.
+          }
+          //
+          // Extract the first call
+          //
+          call = responseQueue.removeFirst();
+          SocketChannel channel = call.connection.channel;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                      call.connection);
+          }
+          //
+          // Send as much data as we can in the non-blocking fashion
+          //
+          int numBytes = channel.write(call.response);
+          if (numBytes < 0) {
+            return true;
+          }
+          if (!call.response.hasRemaining()) {
+            call.connection.decRpcCount();
+            if (numElements == 1) {    // last call fully processes.
+              done = true;             // no more data for this channel.
+            } else {
+              done = false;            // more calls pending to be sent.
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                        call.connection + " Wrote " + numBytes + " bytes.");
+            }
+          } else {
+            //
+            // If we were unable to write the entire response out, then 
+            // insert in Selector queue. 
+            //
+            call.connection.responseQueue.addFirst(call);
+            
+            if (inHandler) {
+              // set the serve time when the response has to be sent later
+              call.timestamp = System.currentTimeMillis();
+              
+              incPending();
+              try {
+                // Wakeup the thread blocked on select, only then can the call 
+                // to channel.register() complete.
+                writeSelector.wakeup();
+                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
+              } catch (ClosedChannelException e) {
+                //Its ok. channel might be closed else where.
+                done = true;
+              } finally {
+                decPending();
+              }
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                        call.connection + " Wrote partial " + numBytes + 
+                        " bytes.");
+            }
+          }
+          error = false;              // everything went off well
+        }
+      } finally {
+        if (error && call != null) {
+          LOG.warn(getName()+", call " + call + ": output error");
+          done = true;               // error. no more data for this channel.
+          closeConnection(call.connection);
+        }
+      }
+      return done;
+    }
+
+    //
+    // Enqueue a response from the application.
+    //
+    void doRespond(Call call) throws IOException {
+      synchronized (call.connection.responseQueue) {
+        call.connection.responseQueue.addLast(call);
+        if (call.connection.responseQueue.size() == 1) {
+          processResponse(call.connection.responseQueue, true);
+        }
+      }
+    }
+
+    private synchronized void incPending() {   // call waiting to be enqueued.
+      pending++;
+    }
+
+    private synchronized void decPending() { // call done enqueueing.
+      pending--;
+      notify();
+    }
+
+    private synchronized void waitPending() throws InterruptedException {
+      while (pending > 0) {
+        wait();
+      }
+    }
+  }
+
+  /** Reads calls from a connection and queues them for handling. */
+  private class Connection {
+    private boolean versionRead = false; //if initial signature and
+                                         //version are read
+    private boolean headerRead = false;  //if the connection header that
+                                         //follows version is read.
+    private SocketChannel channel;
+    private ByteBuffer data;
+    private ByteBuffer dataLengthBuffer;
+    private LinkedList<Call> responseQueue;
+    private volatile int rpcCount = 0; // number of outstanding rpcs
+    private long lastContact;
+    private int dataLength;
+    private Socket socket;
+    // Cache the remote host & port info so that even if the socket is 
+    // disconnected, we can say where it used to connect to.
+    private String hostAddress;
+    private int remotePort;
+    private UserGroupInformation ticket = null;
+
+    public Connection(SelectionKey key, SocketChannel channel, 
+                      long lastContact) {
+      this.channel = channel;
+      this.lastContact = lastContact;
+      this.data = null;
+      this.dataLengthBuffer = ByteBuffer.allocate(4);
+      this.socket = channel.socket();
+      InetAddress addr = socket.getInetAddress();
+      if (addr == null) {
+        this.hostAddress = "*Unknown*";
+      } else {
+        this.hostAddress = addr.getHostAddress();
+      }
+      this.remotePort = socket.getPort();
+      this.responseQueue = new LinkedList<Call>();
+      if (socketSendBufferSize != 0) {
+        try {
+          socket.setSendBufferSize(socketSendBufferSize);
+        } catch (IOException e) {
+          LOG.warn("Connection: unable to set socket send buffer size to " +
+                   socketSendBufferSize);
+        }
+      }
+    }   
+
+    @Override
+    public String toString() {
+      return getHostAddress() + ":" + remotePort; 
+    }
+    
+    public String getHostAddress() {
+      return hostAddress;
+    }
+
+    public void setLastContact(long lastContact) {
+      this.lastContact = lastContact;
+    }
+
+    public long getLastContact() {
+      return lastContact;
+    }
+
+    /* Return true if the connection has no outstanding rpc */
+    private boolean isIdle() {
+      return rpcCount == 0;
+    }
+    
+    /* Decrement the outstanding RPC count */
+    private void decRpcCount() {
+      rpcCount--;
+    }
+    
+    /* Increment the outstanding RPC count */
+    private void incRpcCount() {
+      rpcCount++;
+    }
+    
+    private boolean timedOut(long currentTime) {
+      if (isIdle() && currentTime -  lastContact > maxIdleTime)
+        return true;
+      return false;
+    }
+
+    public int readAndProcess() throws IOException, InterruptedException {
+      while (true) {
+        /* Read at most one RPC. If the header is not read completely yet
+         * then iterate until we read first RPC or until there is no data left.
+         */    
+        int count = -1;
+        if (dataLengthBuffer.remaining() > 0) {
+          count = channel.read(dataLengthBuffer);       
+          if (count < 0 || dataLengthBuffer.remaining() > 0) 
+            return count;
+        }
+      
+        if (!versionRead) {
+          //Every connection is expected to send the header.
+          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
+          count = channel.read(versionBuffer);
+          if (count <= 0) {
+            return count;
+          }
+          int version = versionBuffer.get(0);
+          
+          dataLengthBuffer.flip();          
+          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
+            //Warning is ok since this is not supposed to happen.
+            LOG.warn("Incorrect header or version mismatch from " + 
+                     hostAddress + ":" + remotePort +
+                     " got version " + version + 
+                     " expected version " + CURRENT_VERSION);
+            return -1;
+          }
+          dataLengthBuffer.clear();
+          versionRead = true;
+          continue;
+        }
+        
+        if (data == null) {
+          dataLengthBuffer.flip();
+          dataLength = dataLengthBuffer.getInt();
+       
+          if (dataLength == HBaseClient.PING_CALL_ID) {
+            dataLengthBuffer.clear();
+            return 0;  //ping message
+          }
+          data = ByteBuffer.allocate(dataLength);
+          incRpcCount();  // Increment the rpc count
+        }
+        
+        count = channel.read(data);
+        
+        if (data.remaining() == 0) {
+          dataLengthBuffer.clear();
+          data.flip();
+          if (headerRead) {
+            processData();
+            data = null;
+            return count;
+          } else {
+            processHeader();
+            headerRead = true;
+            data = null;
+            continue;
+          }
+        } 
+        return count;
+      }
+    }
+
+    /// Reads the header following version
+    private void processHeader() throws IOException {
+      /* In the current version, it is just a ticket.
+       * Later we could introduce a "ConnectionHeader" class.
+       */
+      DataInputStream in =
+        new DataInputStream(new ByteArrayInputStream(data.array()));
+      ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf);
+    }
+    
+    private void processData() throws  IOException, InterruptedException {
+      DataInputStream dis =
+        new DataInputStream(new ByteArrayInputStream(data.array()));
+      int id = dis.readInt();                    // try to read an id
+        
+      if (LOG.isDebugEnabled())
+        LOG.debug(" got #" + id);
+            
+      Writable param = (Writable) ReflectionUtils.newInstance(paramClass, conf);           // read param
+      param.readFields(dis);        
+        
+      Call call = new Call(id, param, this);
+      callQueue.put(call);              // queue the call; maybe blocked here
+    }
+
+    private synchronized void close() throws IOException {
+      data = null;
+      dataLengthBuffer = null;
+      if (!channel.isOpen())
+        return;
+      try {socket.shutdownOutput();} catch(Exception e) {}
+      if (channel.isOpen()) {
+        try {channel.close();} catch(Exception e) {}
+      }
+      try {socket.close();} catch(Exception e) {}
+    }
+  }
+
+  /** Handles queued calls . */
+  private class Handler extends Thread {
+    public Handler(int instanceNumber) {
+      this.setDaemon(true);
+      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(HBaseServer.this);
+      final int buffersize = 16 * 1024;
+      ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize);
+      while (running) {
+        try {
+          Call call = callQueue.take(); // pop the queue; maybe blocked here
+
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + ": has #" + call.id + " from " +
+                      call.connection);
+          
+          String errorClass = null;
+          String error = null;
+          Writable value = null;
+          
+          CurCall.set(call);
+          UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
+          UserGroupInformation.setCurrentUGI(call.connection.ticket);
+          try {
+            value = call(call.param, call.timestamp);             // make the call
+          } catch (Throwable e) {
+            LOG.info(getName()+", call "+call+": error: " + e, e);
+            errorClass = e.getClass().getName();
+            error = StringUtils.stringifyException(e);
+          }
+          UserGroupInformation.setCurrentUGI(previous);
+          CurCall.set(null);
+
+          if (buf.size() > buffersize) {
+            // Allocate a new BAOS as reset only moves size back to zero but
+            // keeps the buffer of whatever the largest write was -- see
+            // hbase-900.
+            buf = new ByteArrayOutputStream(buffersize);
+          } else {
+            buf.reset();
+          }
+          DataOutputStream out = new DataOutputStream(buf);
+          out.writeInt(call.id);                // write call id
+          out.writeBoolean(error != null);      // write error flag
+
+          if (error == null) {
+            value.write(out);
+          } else {
+            WritableUtils.writeString(out, errorClass);
+            WritableUtils.writeString(out, error);
+          }
+          call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+          responder.doRespond(call);
+        } catch (InterruptedException e) {
+          if (running) {                          // unexpected -- log it
+            LOG.info(getName() + " caught: " +
+                     StringUtils.stringifyException(e));
+          }
+        } catch (Exception e) {
+          LOG.info(getName() + " caught: " +
+                   StringUtils.stringifyException(e));
+        }
+      }
+      LOG.info(getName() + ": exiting");
+    }
+
+  }
+  
+  protected HBaseServer(String bindAddress, int port,
+                  Class<? extends Writable> paramClass, int handlerCount, 
+                  Configuration conf)
+    throws IOException 
+  {
+    this(bindAddress, port, paramClass, handlerCount,  conf, Integer.toString(port));
+  }
+  /** Constructs a server listening on the named port and address.  Parameters passed must
+   * be of the named class.  The <code>handlerCount</handlerCount> determines
+   * the number of handler threads that will be used to process calls.
+   * 
+   */
+  protected HBaseServer(String bindAddress, int port, 
+                  Class<? extends Writable> paramClass, int handlerCount, 
+                  Configuration conf, String serverName) 
+    throws IOException {
+    this.bindAddress = bindAddress;
+    this.conf = conf;
+    this.port = port;
+    this.paramClass = paramClass;
+    this.handlerCount = handlerCount;
+    this.socketSendBufferSize = 0;
+    this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
+    this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
+    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
+    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
+    
+    // Start the listener here and let it bind to the port
+    listener = new Listener();
+    this.port = listener.getAddress().getPort();    
+    this.rpcMetrics = new HBaseRpcMetrics(serverName,
+                          Integer.toString(this.port), this);
+    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
+
+
+    // Create the responder here
+    responder = new Responder();
+  }
+
+  private void closeConnection(Connection connection) {
+    synchronized (connectionList) {
+      if (connectionList.remove(connection))
+        numConnections--;
+    }
+    try {
+      connection.close();
+    } catch (IOException e) {
+    }
+  }
+  
+  /** Sets the socket buffer size used for responding to RPCs */
+  public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
+
+  /** Starts the service.  Must be called before any calls will be handled. */
+  public synchronized void start() throws IOException {
+    responder.start();
+    listener.start();
+    handlers = new Handler[handlerCount];
+    
+    for (int i = 0; i < handlerCount; i++) {
+      handlers[i] = new Handler(i);
+      handlers[i].start();
+    }
+  }
+
+  /** Stops the service.  No new calls will be handled after this is called. */
+  public synchronized void stop() {
+    LOG.info("Stopping server on " + port);
+    running = false;
+    if (handlers != null) {
+      for (int i = 0; i < handlerCount; i++) {
+        if (handlers[i] != null) {
+          handlers[i].interrupt();
+        }
+      }
+    }
+    listener.interrupt();
+    listener.doStop();
+    responder.interrupt();
+    notifyAll();
+    if (this.rpcMetrics != null) {
+      this.rpcMetrics.shutdown();
+    }
+  }
+
+  /** Wait for the server to be stopped.
+   * Does not wait for all subthreads to finish.
+   *  See {@link #stop()}.
+   */
+  public synchronized void join() throws InterruptedException {
+    while (running) {
+      wait();
+    }
+  }
+
+  /**
+   * Return the socket (ip+port) on which the RPC server is listening to.
+   * @return the socket (ip+port) on which the RPC server is listening to.
+   */
+  public synchronized InetSocketAddress getListenerAddress() {
+    return listener.getAddress();
+  }
+  
+  /** Called for each call. */
+  public abstract Writable call(Writable param, long receiveTime)
+                                                throws IOException;
+  
+  
+  /**
+   * The number of open RPC conections
+   * @return the number of open rpc connections
+   */
+  public int getNumOpenConnections() {
+    return numConnections;
+  }
+  
+  /**
+   * The number of rpc calls in the queue.
+   * @return The number of rpc calls in the queue.
+   */
+  public int getCallQueueLen() {
+    return callQueue.size();
+  }
+  
+}

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java Sun Dec  7 19:20:43 2008
@@ -1,680 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.HBaseClient;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/** A simple RPC mechanism.
- *
- * This is a local hbase copy of the hadoop RPC so we can do things like
- * address HADOOP-414 for hbase-only and try other hbase-specific
- * optimizations like using our own version of ObjectWritable.  Class has been
- * renamed to avoid confusing it w/ hadoop versions.
- * <p>
- * 
- *
- * A <i>protocol</i> is a Java interface.  All parameters and return types must
- * be one of:
- *
- * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
- * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
- * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
- *
- * <li>a {@link String}; or</li>
- *
- * <li>a {@link Writable}; or</li>
- *
- * <li>an array of the above types</li> </ul>
- *
- * All methods in the protocol should throw only IOException.  No field data of
- * the protocol instance is transmitted.
- */
-public class HbaseRPC {
-  // Leave this out in the hadoop ipc package but keep class name.  Do this
-  // so that we dont' get the logging of this class's invocations by doing our
-  // blanket enabling DEBUG on the o.a.h.h. package.
-  private static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
-
-  private HbaseRPC() {
-    super();
-  }                                  // no public ctor
-
-
-  /** A method invocation, including the method name and its parameters.*/
-  private static class Invocation implements Writable, Configurable {
-    // Here, for hbase, we maintain two static maps of method names to code and
-    // vice versa.
-    private static final Map<Byte, String> CODE_TO_METHODNAME =
-      new HashMap<Byte, String>();
-    private static final Map<String, Byte> METHODNAME_TO_CODE =
-      new HashMap<String, Byte>();
-    // Special code that means 'not-encoded'.
-    private static final byte NOT_ENCODED = 0;
-    static {
-      byte code = NOT_ENCODED + 1;
-      code = addToMap(VersionedProtocol.class, code);
-      code = addToMap(HMasterInterface.class, code);
-      code = addToMap(HMasterRegionInterface.class, code);
-      code = addToMap(TransactionalRegionInterface.class, code);
-    }
-    // End of hbase modifications.
-
-    private String methodName;
-    @SuppressWarnings("unchecked")
-    private Class[] parameterClasses;
-    private Object[] parameters;
-    private Configuration conf;
-
-    /** default constructor */
-    public Invocation() {
-      super();
-    }
-
-    /**
-     * @param method
-     * @param parameters
-     */
-    public Invocation(Method method, Object[] parameters) {
-      this.methodName = method.getName();
-      this.parameterClasses = method.getParameterTypes();
-      this.parameters = parameters;
-    }
-
-    /** @return The name of the method invoked. */
-    public String getMethodName() { return methodName; }
-
-    /** @return The parameter classes. */
-    @SuppressWarnings("unchecked")
-    public Class[] getParameterClasses() { return parameterClasses; }
-
-    /** @return The parameter instances. */
-    public Object[] getParameters() { return parameters; }
-
-    public void readFields(DataInput in) throws IOException {
-      byte code = in.readByte();
-      methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code));
-      parameters = new Object[in.readInt()];
-      parameterClasses = new Class[parameters.length];
-      HbaseObjectWritable objectWritable = new HbaseObjectWritable();
-      for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
-          this.conf);
-        parameterClasses[i] = objectWritable.getDeclaredClass();
-      }
-    }
-
-    public void write(DataOutput out) throws IOException {
-      writeMethodNameCode(out, this.methodName);
-      out.writeInt(parameterClasses.length);
-      for (int i = 0; i < parameterClasses.length; i++) {
-        HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
-                                   conf);
-      }
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder buffer = new StringBuilder(256);
-      buffer.append(methodName);
-      buffer.append("(");
-      for (int i = 0; i < parameters.length; i++) {
-        if (i != 0)
-          buffer.append(", ");
-        buffer.append(parameters[i]);
-      }
-      buffer.append(")");
-      return buffer.toString();
-    }
-
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-    }
-
-    public Configuration getConf() {
-      return this.conf;
-    }
-    
-    // Hbase additions.
-    private static void addToMap(final String name, final byte code) {
-      if (METHODNAME_TO_CODE.containsKey(name)) {
-        return;
-      }
-      METHODNAME_TO_CODE.put(name, Byte.valueOf(code));
-      CODE_TO_METHODNAME.put(Byte.valueOf(code), name);
-    }
-    
-    /*
-     * @param c Class whose methods we'll add to the map of methods to codes
-     * (and vice versa).
-     * @param code Current state of the byte code.
-     * @return State of <code>code</code> when this method is done.
-     */
-    private static byte addToMap(final Class<?> c, final byte code) {
-      byte localCode = code;
-      Method [] methods = c.getMethods();
-      // There are no guarantees about the order in which items are returned in
-      // so do a sort (Was seeing that sort was one way on one server and then
-      // another on different server).
-      Arrays.sort(methods, new Comparator<Method>() {
-        public int compare(Method left, Method right) {
-          return left.getName().compareTo(right.getName());
-        }
-      });
-      for (int i = 0; i < methods.length; i++) {
-        addToMap(methods[i].getName(), localCode++);
-      }
-      return localCode;
-    }
-
-    /*
-     * Write out the code byte for passed Class.
-     * @param out
-     * @param c
-     * @throws IOException
-     */
-    static void writeMethodNameCode(final DataOutput out, final String methodname)
-    throws IOException {
-      Byte code = METHODNAME_TO_CODE.get(methodname);
-      if (code == null) {
-        LOG.error("Unsupported type " + methodname);
-        throw new UnsupportedOperationException("No code for unexpected " +
-          methodname);
-      }
-      out.writeByte(code.byteValue());
-    }
-    // End of hbase additions.
-  }
-
-  /* Cache a client using its socket factory as the hash key */
-  static private class ClientCache {
-    private Map<SocketFactory, Client> clients =
-      new HashMap<SocketFactory, Client>();
-
-    /**
-     * Construct & cache an IPC client with the user-provided SocketFactory 
-     * if no cached client exists.
-     * 
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    private synchronized Client getClient(Configuration conf,
-        SocketFactory factory) {
-      // Construct & cache client.  The configuration is only used for timeout,
-      // and Clients have connection pools.  So we can either (a) lose some
-      // connection pooling and leak sockets, or (b) use the same timeout for all
-      // configurations.  Since the IPC is usually intended globally, not
-      // per-job, we choose (a).
-      Client client = clients.get(factory);
-      if (client == null) {
-        // Make an hbase client instead of hadoop Client.
-        client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
-        clients.put(factory, client);
-      } else {
-        ((HBaseClient)client).incCount();
-      }
-      return client;
-    }
-
-    /**
-     * Construct & cache an IPC client with the default SocketFactory 
-     * if no cached client exists.
-     * 
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    private synchronized Client getClient(Configuration conf) {
-      return getClient(conf, SocketFactory.getDefault());
-    }
-
-    /**
-     * Stop a RPC client connection 
-     * A RPC client is closed only when its reference count becomes zero.
-     */
-    private void stopClient(Client client) {
-      synchronized (this) {
-        ((HBaseClient)client).decCount();
-        if (((HBaseClient)client).isZeroReference()) {
-          clients.remove(((HBaseClient)client).getSocketFactory());
-        }
-      }
-      if (((HBaseClient)client).isZeroReference()) {
-        client.stop();
-      }
-    }
-  }
-
-  private static ClientCache CLIENTS = new ClientCache();
-  
-  private static class Invoker implements InvocationHandler {
-    private InetSocketAddress address;
-    private UserGroupInformation ticket;
-    private Client client;
-    private boolean isClosed = false;
-
-    /**
-     * @param address
-     * @param ticket
-     * @param conf
-     * @param factory
-     */
-    public Invoker(InetSocketAddress address, UserGroupInformation ticket, 
-                   Configuration conf, SocketFactory factory) {
-      this.address = address;
-      this.ticket = ticket;
-      this.client = CLIENTS.getClient(conf, factory);
-    }
-
-    public Object invoke(@SuppressWarnings("unused") Object proxy,
-        Method method, Object[] args)
-      throws Throwable {
-      final boolean logDebug = LOG.isDebugEnabled();
-      long startTime = 0;
-      if (logDebug) {
-        startTime = System.currentTimeMillis();
-      }
-      HbaseObjectWritable value = (HbaseObjectWritable)
-        client.call(new Invocation(method, args), address, ticket);
-      if (logDebug) {
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
-      }
-      return value.get();
-    }
-    
-    /* close the IPC client that's responsible for this invoker's RPCs */ 
-    synchronized private void close() {
-      if (!isClosed) {
-        isClosed = true;
-        CLIENTS.stopClient(client);
-      }
-    }
-  }
-
-  /**
-   * A version mismatch for the RPC protocol.
-   */
-  @SuppressWarnings("serial")
-  public static class VersionMismatch extends IOException {
-    private String interfaceName;
-    private long clientVersion;
-    private long serverVersion;
-    
-    /**
-     * Create a version mismatch exception
-     * @param interfaceName the name of the protocol mismatch
-     * @param clientVersion the client's version of the protocol
-     * @param serverVersion the server's version of the protocol
-     */
-    public VersionMismatch(String interfaceName, long clientVersion,
-                           long serverVersion) {
-      super("Protocol " + interfaceName + " version mismatch. (client = " +
-            clientVersion + ", server = " + serverVersion + ")");
-      this.interfaceName = interfaceName;
-      this.clientVersion = clientVersion;
-      this.serverVersion = serverVersion;
-    }
-    
-    /**
-     * Get the interface name
-     * @return the java class name 
-     *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
-     */
-    public String getInterfaceName() {
-      return interfaceName;
-    }
-    
-    /**
-     * @return the client's preferred version
-     */
-    public long getClientVersion() {
-      return clientVersion;
-    }
-    
-    /**
-     * @return the server's agreed to version.
-     */
-    public long getServerVersion() {
-      return serverVersion;
-    }
-  }
-  
-  /**
-   * @param protocol
-   * @param clientVersion
-   * @param addr
-   * @param conf
-   * @param maxAttempts
-   * @return proxy
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  public static VersionedProtocol waitForProxy(Class protocol,
-                                               long clientVersion,
-                                               InetSocketAddress addr,
-                                               Configuration conf,
-                                               int maxAttempts
-                                               ) throws IOException {
-    // HBase does limited number of reconnects which is different from hadoop.
-    int reconnectAttempts = 0;
-    while (true) {
-      try {
-        return getProxy(protocol, clientVersion, addr, conf);
-      } catch(ConnectException se) {  // namenode has not been started
-        LOG.info("Server at " + addr + " not available yet, Zzzzz...");
-        if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
-          LOG.info("Server at " + addr + " could not be reached after " +
-                  reconnectAttempts + " tries, giving up.");
-          throw new RetriesExhaustedException(addr.toString(), "unknown".getBytes(),
-                  "unknown".getBytes(), reconnectAttempts - 1,
-                  new ArrayList<Throwable>());
-      }
-      } catch(SocketTimeoutException te) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + addr);
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ie) {
-        // IGNORE
-      }
-    }
-  }
-
-  /**
-   * Construct a client-side proxy object that implements the named protocol,
-   * talking to a server at the named address.
-   *
-   * @param protocol
-   * @param clientVersion
-   * @param addr
-   * @param conf
-   * @param factory
-   * @return proxy
-   * @throws IOException
-   */
-  public static VersionedProtocol getProxy(Class<?> protocol,
-      long clientVersion, InetSocketAddress addr, Configuration conf,
-      SocketFactory factory) throws IOException {
-    return getProxy(protocol, clientVersion, addr, null, conf, factory);
-  }
-  
-  /**
-   * Construct a client-side proxy object that implements the named protocol,
-   * talking to a server at the named address.
-   *
-   * @param protocol
-   * @param clientVersion
-   * @param addr
-   * @param ticket
-   * @param conf
-   * @param factory
-   * @return proxy
-   * @throws IOException
-   */
-  public static VersionedProtocol getProxy(Class<?> protocol,
-      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory)
-  throws IOException {    
-    VersionedProtocol proxy =
-        (VersionedProtocol) Proxy.newProxyInstance(
-            protocol.getClassLoader(), new Class[] { protocol },
-            new Invoker(addr, ticket, conf, factory));
-    long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
-                                                  clientVersion);
-    if (serverVersion == clientVersion) {
-      return proxy;
-    } else {
-      throw new VersionMismatch(protocol.getName(), clientVersion, 
-                                serverVersion);
-    }
-  }
-
-  /**
-   * Construct a client-side proxy object with the default SocketFactory
-   * 
-   * @param protocol
-   * @param clientVersion
-   * @param addr
-   * @param conf
-   * @return a proxy instance
-   * @throws IOException
-   */
-  public static VersionedProtocol getProxy(Class<?> protocol,
-      long clientVersion, InetSocketAddress addr, Configuration conf)
-      throws IOException {
-
-    return getProxy(protocol, clientVersion, addr, conf, NetUtils
-        .getDefaultSocketFactory(conf));
-  }
-
-  /**
-   * Stop this proxy and release its invoker's resource
-   * @param proxy the proxy to be stopped
-   */
-  public static void stopProxy(VersionedProtocol proxy) {
-    if (proxy!=null) {
-      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
-    }
-  }
-
-  /**
-   * Expert: Make multiple, parallel calls to a set of servers.
-   *
-   * @param method
-   * @param params
-   * @param addrs
-   * @param conf
-   * @return values
-   * @throws IOException
-   */
-  public static Object[] call(Method method, Object[][] params,
-                              InetSocketAddress[] addrs, Configuration conf)
-    throws IOException {
-
-    Invocation[] invocations = new Invocation[params.length];
-    for (int i = 0; i < params.length; i++)
-      invocations[i] = new Invocation(method, params[i]);
-    Client client = CLIENTS.getClient(conf);
-    try {
-    Writable[] wrappedValues = client.call(invocations, addrs);
-    
-    if (method.getReturnType() == Void.TYPE) {
-      return null;
-    }
-
-    Object[] values =
-      (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
-    for (int i = 0; i < values.length; i++)
-      if (wrappedValues[i] != null)
-        values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
-    
-    return values;
-    } finally {
-      CLIENTS.stopClient(client);
-    }
-  }
-
-  /**
-   * Construct a server for a protocol implementation instance listening on a
-   * port and address.
-   *
-   * @param instance
-   * @param bindAddress
-   * @param port
-   * @param conf
-   * @return Server
-   * @throws IOException
-   */
-  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
-    throws IOException {
-    return getServer(instance, bindAddress, port, 1, false, conf);
-  }
-
-  /**
-   * Construct a server for a protocol implementation instance listening on a
-   * port and address.
-   *
-   * @param instance
-   * @param bindAddress
-   * @param port
-   * @param numHandlers
-   * @param verbose
-   * @param conf
-   * @return Server
-   * @throws IOException
-   */
-  public static Server getServer(final Object instance, final String bindAddress, final int port,
-                                 final int numHandlers,
-                                 final boolean verbose, Configuration conf) 
-    throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
-  }
-
-  /** An RPC Server. */
-  public static class Server extends org.apache.hadoop.ipc.Server {
-    private Object instance;
-    private Class<?> implementation;
-    private boolean verbose;
-
-    /**
-     * Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @throws IOException
-     */
-    public Server(Object instance, Configuration conf, String bindAddress, int port) 
-      throws IOException {
-      this(instance, conf,  bindAddress, port, 1, false);
-    }
-    
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
-    }
-    
-    /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @param numHandlers the number of method handler threads to run
-     * @param verbose whether each call should be logged
-     * @throws IOException
-     */
-    public Server(Object instance, Configuration conf, String bindAddress,  int port,
-                  int numHandlers, boolean verbose) throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
-      this.instance = instance;
-      this.implementation = instance.getClass();
-      this.verbose = verbose;
-    }
-
-    @Override
-    public Writable call(Writable param, long receivedTime) throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
-        Method method =
-          implementation.getMethod(call.getMethodName(),
-                                   call.getParameterClasses());
-
-        long startTime = System.currentTimeMillis();
-        Object value = method.invoke(instance, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        LOG.debug("Served: " + call.getMethodName() +
-            " queueTime= " + qTime +
-            " procesingTime= " + processingTime);
-        rpcMetrics.rpcQueueTime.inc(qTime);
-        rpcMetrics.rpcProcessingTime.inc(processingTime);
-
-	MetricsTimeVaryingRate m = rpcMetrics.metricsList.get(call.getMethodName());
-
-	if (m != null) {
-		m.inc(processingTime);
-	}
-	else {
-		rpcMetrics.metricsList.put(call.getMethodName(), new MetricsTimeVaryingRate(call.getMethodName()));
-		m = rpcMetrics.metricsList.get(call.getMethodName());
-		m.inc(processingTime);
-	}
-
-        if (verbose) log("Return: "+value);
-
-        return new HbaseObjectWritable(method.getReturnType(), value);
-
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
-          throw ioe;
-        }
-      } catch (Throwable e) {
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
-      }
-    }
-  }
-
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
-  }
-}

Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/package.html?rev=724238&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/package.html (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/package.html Sun Dec  7 19:20:43 2008
@@ -0,0 +1,24 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Tools to help define network clients and servers.
+This is the hadoop copied local so can fix bugs and make hbase-specific optimizations.
+</body>
+</html>

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/master/HMaster.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/master/HMaster.java Sun Dec  7 19:20:43 2008
@@ -60,10 +60,11 @@
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.HbaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -76,7 +77,6 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.Server;
 
 /**
  * HMaster is the "master server" for a HBase.
@@ -91,8 +91,7 @@
   
   static final Log LOG = LogFactory.getLog(HMaster.class.getName());
 
-  public long getProtocolVersion(@SuppressWarnings("unused") String protocol,
-      @SuppressWarnings("unused") long clientVersion) {
+  public long getProtocolVersion(String protocol, long clientVersion) {
     return HBaseRPCProtocolVersion.versionID;
   }
 
@@ -117,7 +116,7 @@
   volatile BlockingQueue<RegionServerOperation> toDoQueue =
     new LinkedBlockingQueue<RegionServerOperation>();
 
-  private final Server server;
+  private final HBaseServer server;
   private final HServerAddress address;
 
   final ServerConnection connection;
@@ -222,7 +221,7 @@
       conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000);
     this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000);
     
-    this.server = HbaseRPC.getServer(this, address.getBindAddress(),
+    this.server = HBaseRPC.getServer(this, address.getBindAddress(),
         address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
         false, conf);
 
@@ -530,13 +529,11 @@
   /*
    * HMasterRegionInterface
    */
-
-  @SuppressWarnings("unused")
   public MapWritable regionServerStartup(HServerInfo serverInfo)
   throws IOException {
     // Set the address for now even tho it will not be persisted on
     // the HRS side.
-    String rsAddress = Server.getRemoteAddress();
+    String rsAddress = HBaseServer.getRemoteAddress();
     serverInfo.setServerAddress(new HServerAddress
         (rsAddress, serverInfo.getServerAddress().getPort()));
     // register with server manager
@@ -552,7 +549,7 @@
   protected MapWritable createConfigurationSubset() {
     MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
     // Get the real address of the HRS.
-    String rsAddress = Server.getRemoteAddress();
+    String rsAddress = HBaseServer.getRemoteAddress();
     if (rsAddress != null) {
       mw.put(new Text("hbase.regionserver.address"), new Text(rsAddress));
     }

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sun Dec  7 19:20:43 2008
@@ -137,6 +137,13 @@
   // We synchronize on updateLock to prevent updates and to prevent a log roll
   // during an update
   private final Integer updateLock = new Integer(0);
+  
+  /*
+   * If more than this many logs, force flush of oldest region to oldest edit
+   * goes to disk.  If too many and we crash, then will take forever replaying.
+   * Keep the number of logs tidy.
+   */
+  private final int maxLogs;
 
   /**
    * Create an edit log at the given <code>dir</code> location.
@@ -152,10 +159,9 @@
    * @throws IOException
    */
   public HLog(final FileSystem fs, final Path dir, final Configuration conf,
-      final LogRollListener listener) throws IOException {
-    
+    final LogRollListener listener)
+  throws IOException {
     super();
-    
     this.fs = fs;
     this.dir = dir;
     this.conf = conf;
@@ -172,6 +178,7 @@
       throw new IOException("Target HLog directory already exists: " + dir);
     }
     fs.mkdirs(dir);
+    this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64);
     rollWriter();
   }
 
@@ -234,14 +241,17 @@
    * cacheFlushLock and then completeCacheFlush could be called which would wait
    * for the lock on this and consequently never release the cacheFlushLock
    *
+   * @return If lots of logs, flush the returned region so next time through
+   * we can clean logs. Returns null if nothing to flush.
    * @throws FailedLogCloseException
    * @throws IOException
    */
-  public void rollWriter() throws FailedLogCloseException, IOException {
+  public byte [] rollWriter() throws FailedLogCloseException, IOException {
+    byte [] regionToFlush = null;
     this.cacheFlushLock.lock();
     try {
       if (closed) {
-        return;
+        return regionToFlush;
       }
       synchronized (updateLock) {
         // Clean up current writer.
@@ -268,7 +278,7 @@
             }
             this.outputfiles.clear();
           } else {
-            cleanOldLogs();
+            regionToFlush = cleanOldLogs();
           }
         }
         this.numEntries = 0;
@@ -277,32 +287,28 @@
     } finally {
       this.cacheFlushLock.unlock();
     }
+    return regionToFlush;
   }
   
   /*
    * Clean up old commit logs.
+   * @return If lots of logs, flush the returned region so next time through
+   * we can clean logs. Returns null if nothing to flush.
    * @throws IOException
    */
-  private void cleanOldLogs() throws IOException {
-    // Get oldest edit/sequence id.  If logs are older than this id,
-    // then safe to remove.
-    Long oldestOutstandingSeqNum =
-      Collections.min(this.lastSeqWritten.values());
+  private byte [] cleanOldLogs() throws IOException {
+    byte [] regionToFlush = null;
+    Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
     // Get the set of all log files whose final ID is older than or
     // equal to the oldest pending region operation
     TreeSet<Long> sequenceNumbers =
       new TreeSet<Long>(this.outputfiles.headMap(
         (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
     // Now remove old log files (if any)
+    byte [] oldestRegion = null;
     if (LOG.isDebugEnabled()) {
       // Find region associated with oldest key -- helps debugging.
-      byte [] oldestRegion = null;
-      for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
-        if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
-          oldestRegion = e.getKey();
-          break;
-        }
-      }
+      oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
       LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
         " out of total " + this.outputfiles.size() + "; " +
         "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
@@ -313,6 +319,33 @@
         deleteLogFile(this.outputfiles.remove(seq), seq);
       }
     }
+    int countOfLogs = this.outputfiles.size() - sequenceNumbers.size();
+    if (countOfLogs > this.maxLogs) {
+      regionToFlush = oldestRegion != null?
+        oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
+      LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" +
+        this.maxLogs + "; forcing flush of region with oldest edits: " +
+        Bytes.toString(regionToFlush));
+    }
+    return regionToFlush;
+  }
+
+  /*
+   * @return Logs older than this id are safe to remove.
+   */
+  private Long getOldestOutstandingSeqNum() {
+    return Collections.min(this.lastSeqWritten.values());
+  }
+
+  private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
+    byte [] oldestRegion = null;
+    for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
+      if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
+        oldestRegion = e.getKey();
+        break;
+      }
+    }
+    return oldestRegion;
   }
 
   /*

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Dec  7 19:20:43 2008
@@ -72,8 +72,8 @@
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionHistorian;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.UnknownRowLockException;
+import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.ValueOverMaxLengthException;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.client.ServerConnection;
@@ -84,10 +84,11 @@
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
 import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.HbaseRPC;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -96,7 +97,6 @@
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -154,7 +154,7 @@
 
   // Server to handle client requests.  Default access so can be accessed by
   // unit tests.
-  final Server server;
+  final HBaseServer server;
   
   // Leases
   private final Leases leases;
@@ -258,7 +258,7 @@
     this.workerThread = new Thread(worker);
 
     // Server to handle client requests
-    this.server = HbaseRPC.getServer(this, address.getBindAddress(), 
+    this.server = HBaseRPC.getServer(this, address.getBindAddress(), 
       address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
       false, conf);
     // Address is givin a default IP for the moment. Will be changed after
@@ -518,7 +518,7 @@
         serverInfo.getServerAddress().toString());
     }
     if (this.hbaseMaster != null) {
-      HbaseRPC.stopProxy(this.hbaseMaster);
+      HBaseRPC.stopProxy(this.hbaseMaster);
       this.hbaseMaster = null;
     }
     join();
@@ -959,7 +959,7 @@
       try {
         // Do initial RPC setup.  The final argument indicates that the RPC
         // should retry indefinitely.
-        master = (HMasterRegionInterface)HbaseRPC.waitForProxy(
+        master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
             HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
             new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
             this.conf, -1);

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Sun Dec  7 19:20:43 2008
@@ -823,7 +823,7 @@
       List<HStoreFile> filesToCompact = null;
       synchronized (storefiles) {
         if (this.storefiles.size() <= 0) {
-          LOG.debug("no store files to compact");
+          LOG.debug(this.storeNameStr + ": no store files to compact");
           return null;
         }
         // filesToCompact are sorted oldest to newest.

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java Sun Dec  7 19:20:43 2008
@@ -48,7 +48,6 @@
     }
   }
 
-  @Override
   protected void chore() {
     synchronized (log) {
       HLog hlog = log.get();
@@ -57,4 +56,4 @@
       }
     }
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Sun Dec  7 19:20:43 2008
@@ -26,9 +26,10 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Runs periodically to determine if the HLog should be rolled
+ * Runs periodically to determine if the HLog should be rolled.
  * 
  * NOTE: This class extends Thread rather than Chore because the sleep time
  * can be interrupted when there is something to do, rather than the Chore
@@ -61,7 +62,10 @@
       }
       rollLock.lock();          // Don't interrupt us. We're working
       try {
-        server.getLog().rollWriter();
+        byte [] regionToFlush = server.getLog().rollWriter();
+        if (regionToFlush != null) {
+          scheduleFlush(regionToFlush);
+        }
       } catch (FailedLogCloseException e) {
         LOG.fatal("Forcing server shutdown", e);
         server.abort();
@@ -79,6 +83,23 @@
     }
     LOG.info("LogRoller exiting.");
   }
+  
+  private void scheduleFlush(final byte [] region) {
+    boolean scheduled = false;
+    HRegion r = this.server.getOnlineRegion(region);
+    FlushRequester requester = null;
+    if (r != null) {
+      requester = this.server.getFlushRequester();
+      if (requester != null) {
+        requester.request(r);
+        scheduled = true;
+      }
+    }
+    if (!scheduled) {
+    LOG.warn("Failed to schedule flush of " +
+      Bytes.toString(region) + "r=" + r + ", requester=" + requester);
+    }
+  }
 
   public void logRollRequested() {
     synchronized (rollLog) {

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java Sun Dec  7 19:20:43 2008
@@ -20,20 +20,22 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.SortedMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.TimeUnit;
-import java.util.HashSet;
-import java.util.SortedMap;
-import java.util.ConcurrentModificationException;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Thread that flushes cache on request
@@ -58,23 +60,62 @@
   protected final long globalMemcacheLimit;
   protected final long globalMemcacheLimitLowMark;
   
+  public static final float DEFAULT_UPPER = 0.4f;
+  public static final float DEFAULT_LOWER = 0.25f;
+  public static final String UPPER_KEY =
+    "hbase.regionserver.globalMemcache.upperLimit";
+  public static final String LOWER_KEY =
+    "hbase.regionserver.globalMemcache.lowerLimit";
+  
   /**
    * @param conf
    * @param server
    */
-  public MemcacheFlusher(final HBaseConfiguration conf, final HRegionServer server) {
+  public MemcacheFlusher(final HBaseConfiguration conf,
+      final HRegionServer server) {
     super();
     this.server = server;
-    threadWakeFrequency = conf.getLong(
-        HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
-        
-    // default memcache limit of 512MB
-    globalMemcacheLimit = 
-      conf.getLong("hbase.regionserver.globalMemcacheLimit", 512 * 1024 * 1024);
-    // default memcache low mark limit of 256MB, which is half the upper limit
-    globalMemcacheLimitLowMark = 
-      conf.getLong("hbase.regionserver.globalMemcacheLimitLowMark", 
-        globalMemcacheLimit / 2);        
+    this.threadWakeFrequency =
+      conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+    long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+    this.globalMemcacheLimit = globalMemcacheLimit(max, DEFAULT_UPPER,
+      UPPER_KEY, conf);
+    long lower = globalMemcacheLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
+    if (lower > this.globalMemcacheLimit) {
+      lower = this.globalMemcacheLimit;
+      LOG.info("Setting globalMemcacheLimitLowMark == globalMemcacheLimit " +
+        "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
+    }
+    this.globalMemcacheLimitLowMark = lower;
+    LOG.info("globalMemcacheLimit=" +
+      StringUtils.humanReadableInt(this.globalMemcacheLimit) +
+      ", globalMemcacheLimitLowMark=" +
+      StringUtils.humanReadableInt(this.globalMemcacheLimitLowMark) +
+      ", maxHeap=" + StringUtils.humanReadableInt(max));
+  }
+
+  /**
+   * Calculate size using passed <code>key</code> for configured
+   * percentage of <code>max</code>.
+   * @param max
+   * @param defaultLimit
+   * @param key
+   * @param c
+   * @return Limit.
+   */
+  static long globalMemcacheLimit(final long max,
+     final float defaultLimit, final String key, final HBaseConfiguration c) {
+    float limit = c.getFloat(key, defaultLimit);
+    return getMemcacheLimit(max, limit, defaultLimit);
+  }
+  
+  static long getMemcacheLimit(final long max, final float limit,
+      final float defaultLimit) {
+    if (limit >= 0.9f || limit < 0.1f) {
+      LOG.warn("Setting global memcache limit to default of " + defaultLimit +
+        " because supplied value outside allowed range of 0.1 -> 0.9");
+    }
+    return (long)(max * limit);
   }
   
   @Override
@@ -208,15 +249,14 @@
    * to the lower limit. This method blocks callers until we're down to a safe
    * amount of memcache consumption.
    */
-  public void reclaimMemcacheMemory() {
+  public synchronized void reclaimMemcacheMemory() {
     if (server.getGlobalMemcacheSize() >= globalMemcacheLimit) {
       flushSomeRegions();
     }
   }
 
   /*
-   * Emergency!  Need to flush memory.  While running this method all updates
-   * to this regionserver are blocked.
+   * Emergency!  Need to flush memory.
    */
   private synchronized void flushSomeRegions() {
     // keep flushing until we hit the low water mark
@@ -228,19 +268,23 @@
       // flush the region with the biggest memcache
       if (m.size() <= 0) {
         LOG.info("No online regions to flush though we've been asked flush " +
-            "some; globalMemcacheSize=" + globalMemcacheSize +
-            ", globalMemcacheLimitLowMark=" + this.globalMemcacheLimitLowMark);
+          "some; globalMemcacheSize=" +
+          StringUtils.humanReadableInt(globalMemcacheSize) +
+          ", globalMemcacheLimitLowMark=" +
+          StringUtils.humanReadableInt(this.globalMemcacheLimitLowMark));
         break;
       }
       HRegion biggestMemcacheRegion = m.remove(m.firstKey());
       LOG.info("Forced flushing of " +  biggestMemcacheRegion.toString() +
-        " because global memcache limit of " + this.globalMemcacheLimit +
-        " exceeded; currenly " + globalMemcacheSize + " and flushing till " +
-        this.globalMemcacheLimitLowMark);
+        " because global memcache limit of " +
+        StringUtils.humanReadableInt(this.globalMemcacheLimit) +
+        " exceeded; currently " +
+        StringUtils.humanReadableInt(globalMemcacheSize) + " and flushing till " +
+        StringUtils.humanReadableInt(this.globalMemcacheLimitLowMark));
       if (!flushRegion(biggestMemcacheRegion, true)) {
-        // Something bad happened - give up.
+        LOG.warn("Flush failed");
         break;
       }
     }
   }
-}
\ No newline at end of file
+}



Mime
View raw message