hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r775039 - in /hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop: hbase/client/ hbase/ipc/ hbase/master/ hbase/regionserver/ ipc/
Date Fri, 15 May 2009 07:05:26 GMT
Author: apurtell
Date: Fri May 15 07:05:26 2009
New Revision: 775039

URL: http://svn.apache.org/viewvc?rev=775039&view=rev
Log:
roll back RPC bits to 0.18 version, then bring forward just a bit with versioned interface,
error handler, and HBaseObjectWritable

Added:
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java
Removed:
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Modified:
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=775039&r1=775038&r2=775039&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Fri May 15 07:05:26 2009
@@ -777,7 +777,7 @@
             server = (HRegionInterface)HBaseRPC.waitForProxy(
                 serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
                 regionServer.getInetSocketAddress(), this.conf, 
-                this.maxRPCAttempts, this.rpcTimeout);
+                this.maxRPCAttempts);
           } catch (RemoteException e) {
             throw RemoteExceptionHandler.decodeRemoteException(e);
           }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=775039&r1=775038&r2=775039&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
(original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
Fri May 15 07:05:26 2009
@@ -21,7 +21,6 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.lang.reflect.Array;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
@@ -29,20 +28,9 @@
 import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Map;
 
 import javax.net.SocketFactory;
@@ -53,11 +41,14 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.HBaseClient;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
 
 /** A simple RPC mechanism.
  *
@@ -88,33 +79,14 @@
   // Leave this out in the hadoop ipc package but keep class name.  Do this
   // so that we dont' get the logging of this class's invocations by doing our
   // blanket enabling DEBUG on the o.a.h.h. package.
-  protected static final Log LOG =
+  private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
 
-  private HBaseRPC() {
-    super();
-  }                                  // no public ctor
+  private HBaseRPC() {}                                  // no public ctor
 
 
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable, Configurable {
-    // Here, for hbase, we maintain two static maps of method names to code and
-    // vice versa.
-    private static final Map<Byte, String> CODE_TO_METHODNAME =
-      new HashMap<Byte, String>();
-    private static final Map<String, Byte> METHODNAME_TO_CODE =
-      new HashMap<String, Byte>();
-    // Special code that means 'not-encoded'.
-    private static final byte NOT_ENCODED = 0;
-    static {
-      byte code = NOT_ENCODED + 1;
-      code = addToMap(VersionedProtocol.class, code);
-      code = addToMap(HMasterInterface.class, code);
-      code = addToMap(HMasterRegionInterface.class, code);
-      code = addToMap(TransactionalRegionInterface.class, code);
-    }
-    // End of hbase modifications.
-
     private String methodName;
     @SuppressWarnings("unchecked")
     private Class[] parameterClasses;
@@ -122,9 +94,7 @@
     private Configuration conf;
 
     /** default constructor */
-    public Invocation() {
-      super();
-    }
+    public Invocation() {}
 
     /**
      * @param method
@@ -147,20 +117,18 @@
     public Object[] getParameters() { return parameters; }
 
     public void readFields(DataInput in) throws IOException {
-      byte code = in.readByte();
-      methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code));
+      methodName = Text.readString(in);
       parameters = new Object[in.readInt()];
       parameterClasses = new Class[parameters.length];
       HbaseObjectWritable objectWritable = new HbaseObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
-          this.conf);
+        parameters[i] = HbaseObjectWritable.readObject(in, objectWritable, this.conf);
         parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
 
     public void write(DataOutput out) throws IOException {
-      writeMethodNameCode(out, this.methodName);
+      Text.writeString(out, methodName);
       out.writeInt(parameterClasses.length);
       for (int i = 0; i < parameterClasses.length; i++) {
         HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
@@ -170,7 +138,7 @@
 
     @Override
     public String toString() {
-      StringBuilder buffer = new StringBuilder(256);
+      StringBuffer buffer = new StringBuffer();
       buffer.append(methodName);
       buffer.append("(");
       for (int i = 0; i < parameters.length; i++) {
@@ -189,64 +157,13 @@
     public Configuration getConf() {
       return this.conf;
     }
-    
-    // Hbase additions.
-    private static void addToMap(final String name, final byte code) {
-      if (METHODNAME_TO_CODE.containsKey(name)) {
-        return;
-      }
-      METHODNAME_TO_CODE.put(name, Byte.valueOf(code));
-      CODE_TO_METHODNAME.put(Byte.valueOf(code), name);
-    }
-    
-    /*
-     * @param c Class whose methods we'll add to the map of methods to codes
-     * (and vice versa).
-     * @param code Current state of the byte code.
-     * @return State of <code>code</code> when this method is done.
-     */
-    private static byte addToMap(final Class<?> c, final byte code) {
-      byte localCode = code;
-      Method [] methods = c.getMethods();
-      // There are no guarantees about the order in which items are returned in
-      // so do a sort (Was seeing that sort was one way on one server and then
-      // another on different server).
-      Arrays.sort(methods, new Comparator<Method>() {
-        public int compare(Method left, Method right) {
-          return left.getName().compareTo(right.getName());
-        }
-      });
-      for (int i = 0; i < methods.length; i++) {
-        addToMap(methods[i].getName(), localCode++);
-      }
-      return localCode;
-    }
 
-    /*
-     * Write out the code byte for passed Class.
-     * @param out
-     * @param c
-     * @throws IOException
-     */
-    static void writeMethodNameCode(final DataOutput out, final String methodname)
-    throws IOException {
-      Byte code = METHODNAME_TO_CODE.get(methodname);
-      if (code == null) {
-        LOG.error("Unsupported type " + methodname);
-        throw new UnsupportedOperationException("No code for unexpected " +
-          methodname);
-      }
-      out.writeByte(code.byteValue());
-    }
-    // End of hbase additions.
   }
 
   /* Cache a client using its socket factory as the hash key */
   static private class ClientCache {
-    private Map<SocketFactory, HBaseClient> clients =
-      new HashMap<SocketFactory, HBaseClient>();
-
-    protected ClientCache() {}
+    private Map<SocketFactory, Client> clients =
+      new HashMap<SocketFactory, Client>();
 
     /**
      * Construct & cache an IPC client with the user-provided SocketFactory 
@@ -255,20 +172,19 @@
      * @param conf Configuration
      * @return an IPC client
      */
-    protected synchronized HBaseClient getClient(Configuration conf,
+    private synchronized Client getClient(Configuration conf,
         SocketFactory factory) {
       // Construct & cache client.  The configuration is only used for timeout,
       // and Clients have connection pools.  So we can either (a) lose some
       // connection pooling and leak sockets, or (b) use the same timeout for all
       // configurations.  Since the IPC is usually intended globally, not
       // per-job, we choose (a).
-      HBaseClient client = clients.get(factory);
+      Client client = clients.get(factory);
       if (client == null) {
-        // Make an hbase client instead of hadoop Client.
         client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
         clients.put(factory, client);
       } else {
-        client.incCount();
+        ((HBaseClient)client).incCount();
       }
       return client;
     }
@@ -280,7 +196,7 @@
      * @param conf Configuration
      * @return an IPC client
      */
-    protected synchronized HBaseClient getClient(Configuration conf) {
+    private synchronized Client getClient(Configuration conf) {
       return getClient(conf, SocketFactory.getDefault());
     }
 
@@ -288,25 +204,25 @@
      * Stop a RPC client connection 
      * A RPC client is closed only when its reference count becomes zero.
      */
-    protected void stopClient(HBaseClient client) {
+    private void stopClient(Client client) {
       synchronized (this) {
-        client.decCount();
-        if (client.isZeroReference()) {
-          clients.remove(client.getSocketFactory());
+        ((HBaseClient)client).decCount();
+        if (((HBaseClient)client).isZeroReference()) {
+          clients.remove(((HBaseClient)client).getSocketFactory());
         }
       }
-      if (client.isZeroReference()) {
+      if (((HBaseClient)client).isZeroReference()) {
         client.stop();
       }
     }
   }
 
-  protected final static ClientCache CLIENTS = new ClientCache();
+  private static ClientCache CLIENTS=new ClientCache();
   
   private static class Invoker implements InvocationHandler {
     private InetSocketAddress address;
     private UserGroupInformation ticket;
-    private HBaseClient client;
+    private Client client;
     private boolean isClosed = false;
 
     /**
@@ -324,22 +240,16 @@
 
     public Object invoke(Object proxy, Method method, Object[] args)
         throws Throwable {
-      final boolean logDebug = LOG.isDebugEnabled();
-      long startTime = 0;
-      if (logDebug) {
-        startTime = System.currentTimeMillis();
-      }
+      long startTime = System.currentTimeMillis();
       HbaseObjectWritable value = (HbaseObjectWritable)
         client.call(new Invocation(method, args), address, ticket);
-      if (logDebug) {
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
-      }
+      long callTime = System.currentTimeMillis() - startTime;
+      LOG.debug("Call: " + method.getName() + " " + callTime);
       return value.get();
     }
     
     /* close the IPC client that's responsible for this invoker's RPCs */ 
-    synchronized protected void close() {
+    synchronized private void close() {
       if (!isClosed) {
         isClosed = true;
         CLIENTS.stopClient(client);
@@ -409,19 +319,14 @@
                                                long clientVersion,
                                                InetSocketAddress addr,
                                                Configuration conf,
-                                               int maxAttempts,
-                                               long timeout
+                                               int maxAttempts
                                                ) throws IOException {
-    // HBase does limited number of reconnects which is different from hadoop.
-    long startTime = System.currentTimeMillis();
-    IOException ioe;
     int reconnectAttempts = 0;
     while (true) {
       try {
         return getProxy(protocol, clientVersion, addr, conf);
       } catch(ConnectException se) {  // namenode has not been started
         LOG.info("Server at " + addr + " not available yet, Zzzzz...");
-        ioe = se;
         if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
           LOG.info("Server at " + addr + " could not be reached after " +
                   reconnectAttempts + " tries, giving up.");
@@ -431,14 +336,7 @@
       }
       } catch(SocketTimeoutException te) {  // namenode is busy
         LOG.info("Problem connecting to server: " + addr);
-        ioe = te;
       }
-      // check if timed out
-      if (System.currentTimeMillis()-timeout >= startTime) {
-        throw ioe;
-      }
-
-      // wait for retry
       try {
         Thread.sleep(1000);
       } catch (InterruptedException ie) {
@@ -480,8 +378,8 @@
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory)
-  throws IOException {    
+      Configuration conf, SocketFactory factory) throws IOException {    
+
     VersionedProtocol proxy =
         (VersionedProtocol) Proxy.newProxyInstance(
             protocol.getClassLoader(), new Class[] { protocol },
@@ -490,9 +388,10 @@
                                                   clientVersion);
     if (serverVersion == clientVersion) {
       return proxy;
+    } else {
+      throw new VersionMismatch(protocol.getName(), clientVersion, 
+                                serverVersion);
     }
-    throw new VersionMismatch(protocol.getName(), clientVersion, 
-                              serverVersion);
   }
 
   /**
@@ -540,7 +439,7 @@
     Invocation[] invocations = new Invocation[params.length];
     for (int i = 0; i < params.length; i++)
       invocations[i] = new Invocation(method, params[i]);
-    HBaseClient client = CLIENTS.getClient(conf);
+    Client client = CLIENTS.getClient(conf);
     try {
     Writable[] wrappedValues = client.call(invocations, addrs);
     
@@ -597,10 +496,11 @@
   }
 
   /** An RPC Server. */
-  public static class Server extends HBaseServer {
+  public static class Server extends org.apache.hadoop.ipc.Server {
     private Object instance;
     private Class<?> implementation;
     private boolean verbose;
+    private HBaseRPCErrorHandler handler;
 
     /**
      * Construct an RPC server.
@@ -645,6 +545,7 @@
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);
+        
         Method method =
           implementation.getMethod(call.getMethodName(),
                                    call.getParameterClasses());
@@ -653,16 +554,23 @@
         Object value = method.invoke(instance, call.getParameters());
         int processingTime = (int) (System.currentTimeMillis() - startTime);
         int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
+        LOG.debug("Served: " + call.getMethodName() +
             " queueTime= " + qTime +
             " procesingTime= " + processingTime);
-          rpcMetrics.rpcQueueTime.inc(qTime);
-          rpcMetrics.rpcProcessingTime.inc(processingTime);
-        }
         rpcMetrics.rpcQueueTime.inc(qTime);
         rpcMetrics.rpcProcessingTime.inc(processingTime);
-        rpcMetrics.inc(call.getMethodName(), processingTime);
+
+        MetricsTimeVaryingRate m = rpcMetrics.metricsList.get(call.getMethodName());
+
+        if (m != null) {
+          m.inc(processingTime);
+        } else {
+          rpcMetrics.metricsList.put(call.getMethodName(), 
+            new MetricsTimeVaryingRate(call.getMethodName()));
+          m = rpcMetrics.metricsList.get(call.getMethodName());
+          m.inc(processingTime);
+        }
+
         if (verbose) log("Return: "+value);
 
         return new HbaseObjectWritable(method.getReturnType(), value);
@@ -671,9 +579,19 @@
         Throwable target = e.getTargetException();
         if (target instanceof IOException) {
           throw (IOException)target;
+        } else {
+          IOException ioe = new IOException(target.toString());
+          ioe.setStackTrace(target.getStackTrace());
+          throw ioe;
+        }
+      } catch (OutOfMemoryError e) {
+        if (handler != null) {
+          if (handler.checkOOME(e)) {
+            this.stop();
+          }
         }
-        IOException ioe = new IOException(target.toString());
-        ioe.setStackTrace(target.getStackTrace());
+        IOException ioe = new IOException(e.toString());
+        ioe.setStackTrace(e.getStackTrace());
         throw ioe;
       } catch (Throwable e) {
         IOException ioe = new IOException(e.toString());
@@ -681,283 +599,15 @@
         throw ioe;
       }
     }
-  }
 
-  protected static void log(String value) {
-    String v = value;
-    if (v != null && v.length() > 55)
-      v = v.substring(0, 55)+"...";
-    LOG.info(v);
-  }
-
-  public static void connect(Socket socket, 
-      SocketAddress endpoint, 
-      int timeout) throws IOException {
-    if (socket == null || endpoint == null || timeout < 0) {
-      throw new IllegalArgumentException("Illegal argument for connect()");
-    }
-    SocketChannel ch = socket.getChannel();
-    if (ch == null) {
-      // let the default implementation handle it.
-      socket.connect(endpoint, timeout);
-    } else {
-      connect(ch, endpoint, timeout);
+    public void setErrorHandler(HBaseRPCErrorHandler handler) {
+      this.handler = handler;
     }
   }
 
-  private static SelectorPool selector = new SelectorPool();
-
-  public static void connect(SocketChannel channel, SocketAddress endpoint,
-      int timeout) throws IOException {
-    
-    boolean blockingOn = channel.isBlocking();
-    if (blockingOn) {
-      channel.configureBlocking(false);
-    }
-    
-    try { 
-      if (channel.connect(endpoint)) {
-        return;
-      }
-
-      long timeoutLeft = timeout;
-      long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
-      
-      while (true) {
-        // we might have to call finishConnect() more than once
-        // for some channels (with user level protocols)
-        
-        int ret = selector.select((SelectableChannel)channel, 
-                                  SelectionKey.OP_CONNECT, timeoutLeft);
-        
-        if (ret > 0 && channel.finishConnect()) {
-          return;
-        }
-        
-        if (ret == 0 ||
-            (timeout > 0 &&  
-              (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
-          throw new SocketTimeoutException(
-                    timeoutExceptionString(channel, timeout, 
-                                           SelectionKey.OP_CONNECT));
-        }
-      }
-    } catch (IOException e) {
-      // javadoc for SocketChannel.connect() says channel should be closed.
-      try {
-        channel.close();
-      } catch (IOException ignored) {}
-      throw e;
-    } finally {
-      if (blockingOn && channel.isOpen()) {
-        channel.configureBlocking(true);
-      }
-    }
+  private static void log(String value) {
+    if (value!= null && value.length() > 55)
+      value = value.substring(0, 55)+"...";
+    LOG.info(value);
   }
-
-  private static String timeoutExceptionString(SelectableChannel channel,
-      long timeout, int ops) {
-
-    String waitingFor;
-    switch(ops) {
-
-    case SelectionKey.OP_READ :
-      waitingFor = "read"; break;
-
-    case SelectionKey.OP_WRITE :
-      waitingFor = "write"; break;      
-
-    case SelectionKey.OP_CONNECT :
-      waitingFor = "connect"; break;
-
-    default :
-      waitingFor = "" + ops;  
-    }
-
-    return timeout + " millis timeout while " +
-      "waiting for channel to be ready for " + 
-      waitingFor + ". ch : " + channel;    
-  }
-
-  /**
-   * This maintains a pool of selectors. These selectors are closed
-   * once they are idle (unused) for a few seconds.
-   */
-  private static class SelectorPool {
-    
-    private static class SelectorInfo {
-      Selector              selector;
-      long                  lastActivityTime;
-      LinkedList<SelectorInfo> queue; 
-      
-      void close() {
-        if (selector != null) {
-          try {
-            selector.close();
-          } catch (IOException e) {
-            LOG.warn("Unexpected exception while closing selector : " +
-                     StringUtils.stringifyException(e));
-          }
-        }
-      }    
-    }
-    
-    private static class ProviderInfo {
-      SelectorProvider provider;
-      LinkedList<SelectorInfo> queue; // lifo
-      ProviderInfo next;
-    }
-    
-    private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
-    
-    private ProviderInfo providerList = null;
-    
-    /**
-     * Waits on the channel with the given timeout using one of the 
-     * cached selectors. It also removes any cached selectors that are
-     * idle for a few seconds.
-     * 
-     * @param channel
-     * @param ops
-     * @param timeout
-     * @return
-     * @throws IOException
-     */
-    int select(SelectableChannel channel, int ops, long timeout) 
-                                                   throws IOException {
-     
-      SelectorInfo info = get(channel);
-      
-      SelectionKey key = null;
-      int ret = 0;
-      
-      try {
-        while (true) {
-          long start = (timeout == 0) ? 0 : System.currentTimeMillis();
-
-          key = channel.register(info.selector, ops);
-          ret = info.selector.select(timeout);
-          
-          if (ret != 0) {
-            return ret;
-          }
-          
-          /* Sometimes select() returns 0 much before timeout for 
-           * unknown reasons. So select again if required.
-           */
-          if (timeout > 0) {
-            timeout -= System.currentTimeMillis() - start;
-            if (timeout <= 0) {
-              return 0;
-            }
-          }
-          
-          if (Thread.currentThread().isInterrupted()) {
-            throw new InterruptedIOException("Interruped while waiting for " +
-                                             "IO on channel " + channel +
-                                             ". " + timeout + 
-                                             " millis timeout left.");
-          }
-        }
-      } finally {
-        if (key != null) {
-          key.cancel();
-        }
-        
-        //clear the canceled key.
-        try {
-          info.selector.selectNow();
-        } catch (IOException e) {
-          LOG.info("Unexpected Exception while clearing selector : " +
-                   StringUtils.stringifyException(e));
-          // don't put the selector back.
-          info.close();
-          return ret; 
-        }
-        
-        release(info);
-      }
-    }
-    
-    /**
-     * Takes one selector from end of LRU list of free selectors.
-     * If there are no selectors awailable, it creates a new selector.
-     * Also invokes trimIdleSelectors(). 
-     * 
-     * @param channel
-     * @return 
-     * @throws IOException
-     */
-    private synchronized SelectorInfo get(SelectableChannel channel) 
-                                                         throws IOException {
-      SelectorInfo selInfo = null;
-      
-      SelectorProvider provider = channel.provider();
-      
-      // pick the list : rarely there is more than one provider in use.
-      ProviderInfo pList = providerList;
-      while (pList != null && pList.provider != provider) {
-        pList = pList.next;
-      }      
-      if (pList == null) {
-        //LOG.info("Creating new ProviderInfo : " + provider.toString());
-        pList = new ProviderInfo();
-        pList.provider = provider;
-        pList.queue = new LinkedList<SelectorInfo>();
-        pList.next = providerList;
-        providerList = pList;
-      }
-      
-      LinkedList<SelectorInfo> queue = pList.queue;
-      
-      if (queue.isEmpty()) {
-        Selector selector = provider.openSelector();
-        selInfo = new SelectorInfo();
-        selInfo.selector = selector;
-        selInfo.queue = queue;
-      } else {
-        selInfo = queue.removeLast();
-      }
-      
-      trimIdleSelectors(System.currentTimeMillis());
-      return selInfo;
-    }
-    
-    /**
-     * puts selector back at the end of LRU list of free selectos.
-     * Also invokes trimIdleSelectors().
-     * 
-     * @param info
-     */
-    private synchronized void release(SelectorInfo info) {
-      long now = System.currentTimeMillis();
-      trimIdleSelectors(now);
-      info.lastActivityTime = now;
-      info.queue.addLast(info);
-    }
-    
-    /**
-     * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
-     * traverse the whole list, just over the one that have crossed 
-     * the timeout.
-     */
-    private void trimIdleSelectors(long now) {
-      long cutoff = now - IDLE_TIMEOUT;
-      
-      for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
-        if (pList.queue.isEmpty()) {
-          continue;
-        }
-        for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
-          SelectorInfo info = it.next();
-          if (info.lastActivityTime > cutoff) {
-            break;
-          }
-          it.remove();
-          info.close();
-        }
-      }
-    }
-  }
-
 }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java?rev=775039&r1=775038&r2=775039&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java
(original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java
Fri May 15 07:05:26 2009
@@ -64,10 +64,10 @@
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.HBaseRPC.Server;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -123,7 +123,7 @@
   volatile BlockingQueue<RegionServerOperation> toDoQueue =
     new LinkedBlockingQueue<RegionServerOperation>();
 
-  private final HBaseServer server;
+  private final Server server;
   private final HServerAddress address;
 
   final ServerConnection connection;
@@ -568,7 +568,7 @@
     // use the IP given by the user.
     if (serverInfo.getServerAddress().getBindAddress().equals(
         DEFAULT_HOST)) {
-      String rsAddress = HBaseServer.getRemoteAddress();
+      String rsAddress = Server.getRemoteAddress();
       serverInfo.setServerAddress(new HServerAddress(rsAddress,
       serverInfo.getServerAddress().getPort()));
     }
@@ -585,7 +585,7 @@
   protected MapWritable createConfigurationSubset() {
     MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
     // Get the real address of the HRS.
-    String rsAddress = HBaseServer.getRemoteAddress();
+    String rsAddress = Server.getRemoteAddress();
     if (rsAddress != null) {
       mw.put(new Text("hbase.regionserver.address"), new Text(rsAddress));
     }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=775039&r1=775038&r2=775039&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Fri May 15 07:05:26 2009
@@ -89,9 +89,9 @@
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.HBaseRPC.Server;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -168,7 +168,7 @@
 
   // Server to handle client requests.  Default access so can be accessed by
   // unit tests.
-  HBaseServer server;
+  Server server;
   
   // Leases
   private Leases leases;
@@ -1296,8 +1296,7 @@
         // should retry indefinitely.
         master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
             HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
-            masterAddress.getInetSocketAddress(),
-            this.conf, -1, this.rpcTimeout);
+            masterAddress.getInetSocketAddress(), this.conf, -1);
       } catch (IOException e) {
         LOG.warn("Unable to connect to master. Retrying. Error was:", e);
         sleeper.sleep();

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java?rev=775039&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java Fri
May 15 07:05:26 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Subclass of hadoop's Client just so we can make some methods accessible
+ * in {@link org.apache.hadoop.hbase.ipc.HbaseRPC}
+ */
+public class HBaseClient extends Client {
+  /**
+   * @param valueClass
+   * @param conf
+   * @param factory
+   */
+  public HBaseClient(Class valueClass, Configuration conf, SocketFactory factory) {
+    super(valueClass, conf, factory);
+  }
+
+  /**
+   * @param valueClass
+   * @param conf
+   */
+  public HBaseClient(Class<?> valueClass, Configuration conf) {
+    super(valueClass, conf);
+  }
+  
+  @Override
+  public void incCount() {
+    super.incCount();
+  }
+  
+  @Override
+  public void decCount() {
+    super.decCount();
+  }
+  
+  @Override
+  public boolean isZeroReference() {
+    return super.isZeroReference();
+  }
+  
+  @Override
+  public SocketFactory getSocketFactory() {
+    return super.getSocketFactory();
+  }
+}
\ No newline at end of file



Mime
View raw message