hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r1001066 [1/2] - in /hbase/branches/0.89.20100924: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/executor/ src/main/java/org/apache/hadoop/hbase/io/hfile/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main...
Date Fri, 24 Sep 2010 20:48:56 GMT
Author: jdcryans
Date: Fri Sep 24 20:48:55 2010
New Revision: 1001066

URL: http://svn.apache.org/viewvc?rev=1001066&view=rev
Log:
 HBASE-3008  Memstore.updateColumnValue passes wrong flag to heapSizeChange
 HBASE-3035  Bandaid for HBASE-2990
 HBASE-2643  Figure how to deal with eof splitting logs
 HBASE-2941  port HADOOP-6713 - threading scalability for RPC reads - to HBase
 HBASE-3006  Reading compressed HFile blocks causes way too many DFS RPC calls
             severly impacting performance
 HBASE-2989  [replication] RSM won't cleanup after locking if 0 peers
 HBASE-2992  [replication] MalformedObjectNameException in ReplicationMetrics
 HBASE-3034  Revert the regions assignment part of HBASE-2694 (and pals) for 0.89
 HBASE-3033  [replication] ReplicationSink.replicateEntries improvements
 HBASE-2997  Performance fixes - profiler driven
 HBASE-2889  Tool to look at HLogs -- parse and tail -f (patch #2 only)

Modified:
    hbase/branches/0.89.20100924/CHANGES.txt
    hbase/branches/0.89.20100924/pom.xml
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java
    hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
    hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
    hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
    hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java
    hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java
    hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java

Modified: hbase/branches/0.89.20100924/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/CHANGES.txt?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/CHANGES.txt (original)
+++ hbase/branches/0.89.20100924/CHANGES.txt Fri Sep 24 20:48:55 2010
@@ -1,5 +1,5 @@
 HBase Change Log
-Release 0.89.20100830 - Mon Sep 13 10:01:42 PDT 2010
+Release 0.89.20100924 - Unreleased
   INCOMPATIBLE CHANGES
    HBASE-1822  Remove the deprecated APIs
    HBASE-1848  Fixup shell for HBASE-1822
@@ -492,6 +492,15 @@ Release 0.89.20100830 - Mon Sep 13 10:01
    HBASE-2967  Failed split: IOE 'File is Corrupt!' -- sync length not being
                written out to SequenceFile
    HBASE-2975  DFSClient names in master and RS should be unique
+   HBASE-3008  Memstore.updateColumnValue passes wrong flag to heapSizeChange
+   HBASE-3035  Bandaid for HBASE-2990
+   HBASE-2643  Figure how to deal with eof splitting logs
+   HBASE-2941  port HADOOP-6713 - threading scalability for RPC reads - to HBase
+   HBASE-3006  Reading compressed HFile blocks causes way too many DFS RPC calls
+               severly impacting performance
+   HBASE-2989  [replication] RSM won't cleanup after locking if 0 peers
+   HBASE-2992  [replication] MalformedObjectNameException in ReplicationMetrics
+   HBASE-2889  Tool to look at HLogs -- parse and tail -f (patch #2 only)
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable
@@ -859,6 +868,8 @@ Release 0.89.20100830 - Mon Sep 13 10:01
    HBASE-2904  Smart seeking using filters (Pranav via Ryan)
    HBASE-2922  HLog preparation and cleanup are done under the updateLock, 
                major slowdown
+   HBASE-3034  Revert the regions assignment part of HBASE-2694 (and pals) for 0.89
+   HBASE-3033  [replication] ReplicationSink.replicateEntries improvements
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts
@@ -910,6 +921,7 @@ Release 0.89.20100830 - Mon Sep 13 10:01
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite
    HBASE-2041  Change WAL default configuration values
+   HBASE-2997  Performance fixes - profiler driven
 
 
 Release 0.20.0 - Tue Sep  8 12:53:05 PDT 2009

Modified: hbase/branches/0.89.20100924/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/pom.xml?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/pom.xml (original)
+++ hbase/branches/0.89.20100924/pom.xml Fri Sep 24 20:48:55 2010
@@ -476,7 +476,7 @@
   <properties>
     <compileSource>1.6</compileSource>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <hbase.version>0.89.20100830</hbase.version>
+    <hbase.version>0.89.20100924</hbase.version>
     <hadoop.version>0.20.3-append-r964955-1240</hadoop.version>
 
     <commons-cli.version>1.2</commons-cli.version>

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Fri Sep 24 20:48:55 2010
@@ -185,6 +185,7 @@ public class HColumnDescriptor implement
         desc.values.entrySet()) {
       this.values.put(e.getKey(), e.getValue());
     }
+    setMaxVersions(desc.getMaxVersions());
   }
 
   /**
@@ -357,12 +358,7 @@ public class HColumnDescriptor implement
   }
 
   /** @return maximum number of versions */
-  public synchronized int getMaxVersions() {
-    if (this.cachedMaxVersions == -1) {
-      String value = getValue(HConstants.VERSIONS);
-      this.cachedMaxVersions = (value != null)?
-        Integer.valueOf(value).intValue(): DEFAULT_VERSIONS;
-    }
+  public int getMaxVersions() {
     return this.cachedMaxVersions;
   }
 
@@ -371,6 +367,7 @@ public class HColumnDescriptor implement
    */
   public void setMaxVersions(int maxVersions) {
     setValue(HConstants.VERSIONS, Integer.toString(maxVersions));
+    cachedMaxVersions = maxVersions;
   }
 
   /**
@@ -603,12 +600,12 @@ public class HColumnDescriptor implement
         ImmutableBytesWritable value = new ImmutableBytesWritable();
         key.readFields(in);
         value.readFields(in);
-        
+
         // in version 8, the BloomFilter setting changed from bool to enum
         if (version < 8 && Bytes.toString(key.get()).equals(BLOOMFILTER)) {
           value.set(Bytes.toBytes(
               Boolean.getBoolean(Bytes.toString(value.get()))
-                ? BloomType.ROW.toString() 
+                ? BloomType.ROW.toString()
                 : BloomType.NONE.toString()));
         }
 
@@ -618,6 +615,9 @@ public class HColumnDescriptor implement
         // Convert old values.
         setValue(COMPRESSION, Compression.Algorithm.NONE.getName());
       }
+      String value = getValue(HConstants.VERSIONS);
+      this.cachedMaxVersions = (value != null)?
+          Integer.valueOf(value).intValue(): DEFAULT_VERSIONS;
     }
   }
 

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/KeyValue.java Fri Sep 24 20:48:55 2010
@@ -202,6 +202,10 @@ public class KeyValue implements Writabl
   private int offset = 0;
   private int length = 0;
 
+  // the row cached
+  private byte [] rowCache = null;
+
+
   /** Here be dragons **/
 
   // used to achieve atomic operations in the memstore.
@@ -673,8 +677,13 @@ public class KeyValue implements Writabl
   /**
    * @return Length of key portion.
    */
+  private int keyLength = 0;
+
   public int getKeyLength() {
-    return Bytes.toInt(this.bytes, this.offset);
+    if (keyLength == 0) {
+      keyLength = Bytes.toInt(this.bytes, this.offset);
+    }
+    return keyLength;
   }
 
   /**
@@ -861,19 +870,25 @@ public class KeyValue implements Writabl
    * @return Row in a new byte array.
    */
   public byte [] getRow() {
-    int o = getRowOffset();
-    short l = getRowLength();
-    byte [] result = new byte[l];
-    System.arraycopy(getBuffer(), o, result, 0, l);
-    return result;
+    if (rowCache == null) {
+      int o = getRowOffset();
+      short l = getRowLength();
+      rowCache = new byte[l];
+      System.arraycopy(getBuffer(), o, rowCache, 0, l);
+    }
+    return rowCache;
   }
 
   /**
    *
    * @return Timestamp
    */
+  private long timestampCache = -1;
   public long getTimestamp() {
-    return getTimestamp(getKeyLength());
+    if (timestampCache == -1) {
+      timestampCache = getTimestamp(getKeyLength());
+    }
+    return timestampCache;
   }
 
   /**
@@ -1889,10 +1904,11 @@ public class KeyValue implements Writabl
 
   // HeapSize
   public long heapSize() {
-    return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE +
+    return ClassSize.align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) +
         ClassSize.align(ClassSize.ARRAY + length) +
-        (2 * Bytes.SIZEOF_INT) +
-        Bytes.SIZEOF_LONG);
+        (3 * Bytes.SIZEOF_INT) +
+        ClassSize.align(ClassSize.ARRAY + (rowCache == null ? 0 : rowCache.length)) +
+        (2 * Bytes.SIZEOF_LONG));
   }
 
   // this overload assumes that the length bytes have already been read,

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java Fri Sep 24 20:48:55 2010
@@ -1,278 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.executor;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType;
-import org.apache.hadoop.hbase.master.ServerManager;
-
-
-/**
- * Abstract base class for all HBase event handlers. Subclasses should 
- * implement the process() method where the actual handling of the event 
- * happens.
- * 
- * HBaseEventType is a list of ALL events (which also corresponds to messages - 
- * either internal to one component or between components). The event type 
- * names specify the component from which the event originated, and the 
- * component which is supposed to handle it.
- * 
- * Listeners can listen to all the events by implementing the interface 
- * HBaseEventHandlerListener, and by registering themselves as a listener. They 
- * will be called back before and after the process of every event.
- * 
- * TODO: Rename HBaseEvent and HBaseEventType to EventHandler and EventType 
- * after ZK refactor as it currently would clash with EventType from ZK and 
- * make the code very confusing.
- */
-public abstract class HBaseEventHandler implements Runnable
-{
-  private static final Log LOG = LogFactory.getLog(HBaseEventHandler.class);
-  // type of event this object represents
-  protected HBaseEventType eventType = HBaseEventType.NONE;
-  // is this a region server or master?
-  protected boolean isRegionServer;
-  // name of the server - this is needed for naming executors in case of tests 
-  // where region servers may be co-located.
-  protected String serverName;
-  // listeners that are called before and after an event is processed
-  protected static List<HBaseEventHandlerListener> eventHandlerListeners = 
-    Collections.synchronizedList(new ArrayList<HBaseEventHandlerListener>());  
-
-  /**
-   * This interface provides hooks to listen to various events received by the 
-   * queue. A class implementing this can listen to the updates by calling 
-   * registerListener and stop receiving updates by calling unregisterListener
-   */
-  public interface HBaseEventHandlerListener {
-    /**
-     * Called before any event is processed
-     */
-    public void beforeProcess(HBaseEventHandler event);
-    /**
-     * Called after any event is processed
-     */
-    public void afterProcess(HBaseEventHandler event);
-  }
-
-  /**
-   * These are a list of HBase events that can be handled by the various
-   * HBaseExecutorService's. All the events are serialized as byte values.
-   */
-  public enum HBaseEventType {
-    NONE (-1),
-    // Messages originating from RS (NOTE: there is NO direct communication from 
-    // RS to Master). These are a result of RS updates into ZK.
-    RS2ZK_REGION_CLOSING      (1),   // RS is in process of closing a region
-    RS2ZK_REGION_CLOSED       (2),   // RS has finished closing a region
-    RS2ZK_REGION_OPENING      (3),   // RS is in process of opening a region
-    RS2ZK_REGION_OPENED       (4),   // RS has finished opening a region
-    
-    // Updates from master to ZK. This is done by the master and there is 
-    // nothing to process by either Master or RS
-    M2ZK_REGION_OFFLINE       (50);  // Master adds this region as offline in ZK
-    
-    private final byte value;
-    
-    /**
-     * Called by the HMaster. Returns a name of the executor service given an 
-     * event type. Every event type has en entry - if the event should not be 
-     * handled just add the NONE executor.
-     * @return name of the executor service
-     */
-    public HBaseExecutorServiceType getMasterExecutorForEvent() {
-      HBaseExecutorServiceType executorServiceType = null;
-      switch(this) {
-      
-      case RS2ZK_REGION_CLOSING:
-      case RS2ZK_REGION_CLOSED:
-        executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION;
-        break;
-
-      case RS2ZK_REGION_OPENING:
-      case RS2ZK_REGION_OPENED:
-        executorServiceType = HBaseExecutorServiceType.MASTER_OPENREGION;
-        break;
-        
-      case M2ZK_REGION_OFFLINE:
-        executorServiceType = HBaseExecutorServiceType.NONE;
-        break;
-        
-      default:
-        throw new RuntimeException("Unhandled event type in the master.");
-      }
-      
-      return executorServiceType;
-    }
-
-    /**
-     * Called by the RegionServer. Returns a name of the executor service given an 
-     * event type. Every event type has en entry - if the event should not be 
-     * handled just return a null executor name.
-     * @return name of the event service
-     */
-    public static String getRSExecutorForEvent(String serverName) {
-      throw new RuntimeException("Unsupported operation.");
-    }
-    
-    /**
-     * Start the executor service that handles the passed in event type. The 
-     * server that starts these event executor services wants to handle these 
-     * event types.
-     */
-    public void startMasterExecutorService(String serverName) {
-      HBaseExecutorServiceType serviceType = getMasterExecutorForEvent();
-      if(serviceType == HBaseExecutorServiceType.NONE) {
-        throw new RuntimeException("Event type " + toString() + " not handled on master.");
-      }
-      serviceType.startExecutorService(serverName);
-    }
-
-    public static void startRSExecutorService() {
-      
-    }
-
-    HBaseEventType(int intValue) {
-      this.value = (byte)intValue;
-    }
-    
-    public byte getByteValue() {
-      return value;
-    }
-
-    public static HBaseEventType fromByte(byte value) {
-      switch(value) {
-        case  -1: return HBaseEventType.NONE;
-        case  1 : return HBaseEventType.RS2ZK_REGION_CLOSING;
-        case  2 : return HBaseEventType.RS2ZK_REGION_CLOSED;
-        case  3 : return HBaseEventType.RS2ZK_REGION_OPENING;
-        case  4 : return HBaseEventType.RS2ZK_REGION_OPENED;
-        case  50: return HBaseEventType.M2ZK_REGION_OFFLINE;
-
-        default:
-          throw new RuntimeException("Invalid byte value for conversion to HBaseEventType");
-      }
-    }
-  }
-  
-  /**
-   * Default base class constructor.
-   * 
-   * TODO: isRegionServer and serverName will go away once we do the HMaster 
-   * refactor. We will end up passing a ServerStatus which should tell us both 
-   * the name and if it is a RS or master.
-   */
-  public HBaseEventHandler(boolean isRegionServer, String serverName, HBaseEventType eventType) {
-    this.isRegionServer = isRegionServer;
-    this.eventType = eventType;
-    this.serverName = serverName;
-  }
-  
-  /**
-   * This is a wrapper around process, used to update listeners before and after 
-   * events are processed. 
-   */
-  public void run() {
-    // fire all beforeProcess listeners
-    for(HBaseEventHandlerListener listener : eventHandlerListeners) {
-      listener.beforeProcess(this);
-    }
-    
-    // call the main process function
-    try {
-      process();
-    } catch(Throwable t) {
-      LOG.error("Caught throwable while processing event " + eventType, t);
-    }
-
-    // fire all afterProcess listeners
-    for(HBaseEventHandlerListener listener : eventHandlerListeners) {
-      LOG.debug("Firing " + listener.getClass().getName() + 
-                ".afterProcess event listener for event " + eventType);
-      listener.afterProcess(this);
-    }
-  }
-  
-  /**
-   * This method is the main processing loop to be implemented by the various 
-   * subclasses.
-   */
-  public abstract void process();
-  
-  /**
-   * Subscribe to updates before and after processing events
-   */
-  public static void registerListener(HBaseEventHandlerListener listener) {
-    eventHandlerListeners.add(listener);
-  }
-  
-  /**
-   * Stop receiving updates before and after processing events
-   */
-  public static void unregisterListener(HBaseEventHandlerListener listener) {
-    eventHandlerListeners.remove(listener);
-  }
-  
-  public boolean isRegionServer() {
-    return isRegionServer;
-  }
-
-  /**
-   * Return the name for this event type.
-   * @return
-   */
-  public HBaseExecutorServiceType getEventHandlerName() {
-    // TODO: check for isRegionServer here
-    return eventType.getMasterExecutorForEvent();
-  }
-  
-  /**
-   * Return the event type
-   * @return
-   */
-  public HBaseEventType getHBEvent() {
-    return eventType;
-  }
-
-  /**
-   * Submits this event object to the correct executor service. This is causes
-   * this object to get executed by the correct ExecutorService.
-   */
-  public void submit() {
-    HBaseExecutorServiceType serviceType = getEventHandlerName();
-    if(serviceType == null) {
-      throw new RuntimeException("Event " + eventType + " not handled on this server " + serverName);
-    }
-    serviceType.getExecutor(serverName).submit(this);
-  }
-  
-  /**
-   * Executes this event object in the caller's thread. This is a synchronous 
-   * way of executing the event.
-   */
-  public void execute() {
-    this.run();
-  }
-}

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java Fri Sep 24 20:48:55 2010
@@ -1,171 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.executor;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * This is a generic HBase executor service. This component abstract a
- * threadpool, a queue to which jobs can be submitted and a Runnable that
- * handles the object that is added to the queue.
- *
- * In order to create a new HBExecutorService, you need to do:
- *   HBExecutorService.startExecutorService("myService");
- *
- * In order to use the service created above, you need to override the
- * HBEventHandler class and create an event type that submits to this service.
- *
- */
-public class HBaseExecutorService
-{
-  private static final Log LOG = LogFactory.getLog(HBaseExecutorService.class);
-  // default number of threads in the pool
-  private int corePoolSize = 1;
-  // max number of threads - maximum concurrency
-  private int maximumPoolSize = 5;
-  // how long to retain excess threads
-  private long keepAliveTimeInMillis = 1000;
-  // the thread pool executor that services the requests
-  ThreadPoolExecutor threadPoolExecutor;
-  // work queue to use - unbounded queue
-  BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
-  // name for this executor service
-  String name;
-  // hold the all the executors created in a map addressable by their names
-  static Map<String, HBaseExecutorService> executorServicesMap =
-    Collections.synchronizedMap(new HashMap<String, HBaseExecutorService>());
-
-  
-  /**
-   * The following is a list of names for the various executor services in both 
-   * the master and the region server.
-   */
-  public enum HBaseExecutorServiceType {
-    NONE                       (-1),
-    MASTER_CLOSEREGION         (1),
-    MASTER_OPENREGION          (2);
-    
-    private final int value;
-    
-    HBaseExecutorServiceType(int intValue) {
-      this.value = intValue;
-    }
-    
-    public void startExecutorService(String serverName) {
-      // if this is NONE then there is no executor to start
-      if(value == NONE.value) {
-        throw new RuntimeException("Cannot start NONE executor type.");
-      }
-      String name = getExecutorName(serverName);
-      if(HBaseExecutorService.isExecutorServiceRunning(name)) {
-        LOG.debug("Executor service " + toString() + " already running on " + serverName);
-        return;
-      }
-      HBaseExecutorService.startExecutorService(name);
-    }
-    
-    public HBaseExecutorService getExecutor(String serverName) {
-      // if this is NONE then there is no executor
-      if(value == NONE.value) {
-        return null;
-      }
-      return HBaseExecutorService.getExecutorService(getExecutorName(serverName));
-    }
-    
-    public String getExecutorName(String serverName) {
-      // if this is NONE then there is no executor
-      if(value == NONE.value) {
-        return null;
-      }
-      return (this.toString() + "-" + serverName);
-    }
-  }
-
-
-
-  /**
-   * Start an executor service with a given name. If there was a service already
-   * started with the same name, this throws a RuntimeException.
-   * @param name Name of the service to start.
-   */
-  public static void startExecutorService(String name) {
-    if(executorServicesMap.get(name) != null) {
-      throw new RuntimeException("An executor service with the name " + name + " is already running!");
-    }
-    HBaseExecutorService hbes = new HBaseExecutorService(name);
-    executorServicesMap.put(name, hbes);
-    LOG.debug("Starting executor service: " + name);
-  }
-  
-  public static boolean isExecutorServiceRunning(String name) {
-    return (executorServicesMap.containsKey(name));
-  }
-
-  /**
-   * This method is an accessor for all the HBExecutorServices running so far
-   * addressable by name. If there is no such service, then it returns null.
-   */
-  public static HBaseExecutorService getExecutorService(String name) {
-    HBaseExecutorService executor = executorServicesMap.get(name);
-    if(executor == null) {
-      LOG.debug("Executor service [" + name + "] not found.");
-    }
-    return executor;
-  }
-  
-  public static void shutdown() {
-    for(Entry<String, HBaseExecutorService> entry : executorServicesMap.entrySet()) {
-      entry.getValue().threadPoolExecutor.shutdown();
-    }
-    executorServicesMap.clear();
-  }
-
-  protected HBaseExecutorService(String name) {
-    this.name = name;
-    // create the thread pool executor
-    threadPoolExecutor = new ThreadPoolExecutor(
-                                corePoolSize,
-                                maximumPoolSize,
-                                keepAliveTimeInMillis,
-                                TimeUnit.MILLISECONDS,
-                                workQueue
-                                );
-    // name the threads for this threadpool
-    threadPoolExecutor.setThreadFactory(new NamedThreadFactory(name));
-  }
-
-  /**
-   * Submit the event to the queue for handling.
-   * @param event
-   */
-  public void submit(Runnable event) {
-    threadPoolExecutor.execute(event);
-  }
-}

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java Fri Sep 24 20:48:55 2010
@@ -1,92 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.executor;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
-import org.apache.hadoop.io.Writable;
-
-public class RegionTransitionEventData implements Writable {
-  private HBaseEventType hbEvent;
-  private String rsName;
-  private long timeStamp;
-  private HMsg hmsg;
-  
-  public RegionTransitionEventData() {
-  }
-
-  public RegionTransitionEventData(HBaseEventType hbEvent, String rsName) {
-    this(hbEvent, rsName, null);
-  }
-
-  public RegionTransitionEventData(HBaseEventType hbEvent, String rsName, HMsg hmsg) {
-    this.hbEvent = hbEvent;
-    this.rsName = rsName;
-    this.timeStamp = System.currentTimeMillis();
-    this.hmsg = hmsg;
-  }
-  
-  public HBaseEventType getHbEvent() {
-    return hbEvent;
-  }
-
-  public String getRsName() {
-    return rsName;
-  }
-
-  public long getTimeStamp() {
-    return timeStamp;
-  }
-
-  public HMsg getHmsg() {
-    return hmsg;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    // the event type byte
-    hbEvent = HBaseEventType.fromByte(in.readByte());
-    // the hostname of the RS sending the data
-    rsName = in.readUTF();
-    // the timestamp
-    timeStamp = in.readLong();
-    if(in.readBoolean()) {
-      // deserialized the HMsg from ZK
-      hmsg = new HMsg();
-      hmsg.readFields(in);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeByte(hbEvent.getByteValue());
-    out.writeUTF(rsName);
-    out.writeLong(System.currentTimeMillis());
-    out.writeBoolean((hmsg != null));
-    if(hmsg != null) {
-      hmsg.write(out);
-    }
-  }
-
-}

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Fri Sep 24 20:48:55 2010
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -1051,9 +1052,14 @@ public class HFile {
         // decompressor reading into next block -- IIRC, it just grabs a
         // bunch of data w/o regard to whether decompressor is coming to end of a
         // decompression.
+
+        // We use a buffer of DEFAULT_BLOCKSIZE size.  This might be extreme.
+        // Could maybe do with less. Study and figure it: TODO
         InputStream is = this.compressAlgo.createDecompressionStream(
+            new BufferedInputStream(
           new BoundedRangeFileInputStream(this.istream, offset, compressedSize,
             pread),
+                Math.min(DEFAULT_BLOCKSIZE, compressedSize)),
           decompressor, 0);
         buf = ByteBuffer.allocate(decompressedSize);
         IOUtils.readFully(is, buf.array(), 0, buf.capacity());
@@ -1194,7 +1200,8 @@ public class HFile {
           return null;
         }
         return new KeyValue(this.block.array(),
-            this.block.arrayOffset() + this.block.position() - 8);
+            this.block.arrayOffset() + this.block.position() - 8,
+            this.currKeyLen+this.currValueLen+8);
       }
 
       public ByteBuffer getKey() {
@@ -1238,16 +1245,17 @@ public class HFile {
             return false;
           }
           block = reader.readBlock(this.currBlock, this.cacheBlocks, this.pread);
-          currKeyLen = block.getInt();
-          currValueLen = block.getInt();
+          currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4);
+          currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4);
+          block.position(block.position()+8);
           blockFetches++;
           return true;
         }
         // LOG.debug("rem:" + block.remaining() + " p:" + block.position() +
         // " kl: " + currKeyLen + " kv: " + currValueLen);
-
-        currKeyLen = block.getInt();
-        currValueLen = block.getInt();
+        currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4);
+        currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4);
+        block.position(block.position()+8);
         return true;
       }
 

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Fri Sep 24 20:48:55 2010
@@ -486,54 +486,28 @@ public class HBaseRPC {
    * @param instance instance
    * @param bindAddress bind address
    * @param port port to bind to
-   * @param conf configuration
-   * @return Server
-   * @throws IOException e
-   */
-  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
-    throws IOException {
-    return getServer(instance, bindAddress, port, 1, false, conf);
-  }
-
-  /**
-   * Construct a server for a protocol implementation instance listening on a
-   * port and address.
-   *
-   * @param instance instance
-   * @param bindAddress bind address
-   * @param port port to bind to
    * @param numHandlers number of handlers to start
    * @param verbose verbose flag
    * @param conf configuration
    * @return Server
    * @throws IOException e
    */
-  public static Server getServer(final Object instance, final String bindAddress, final int port,
+  public static Server getServer(final Object instance,
+                                 final Class<?>[] ifaces,
+                                 final String bindAddress, final int port,
                                  final int numHandlers,
                                  final boolean verbose, Configuration conf)
     throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+    return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, verbose);
   }
 
   /** An RPC Server. */
   public static class Server extends HBaseServer {
     private Object instance;
     private Class<?> implementation;
+    private Class<?> ifaces[];
     private boolean verbose;
 
-    /**
-     * Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @throws IOException e
-     */
-    public Server(Object instance, Configuration conf, String bindAddress, int port)
-      throws IOException {
-      this(instance, conf,  bindAddress, port, 1, false);
-    }
-
     private static String classNameBase(String className) {
       String[] names = className.split("\\.", -1);
       if (names == null || names.length == 0) {
@@ -551,12 +525,19 @@ public class HBaseRPC {
      * @param verbose whether each call should be logged
      * @throws IOException e
      */
-    public Server(Object instance, Configuration conf, String bindAddress,  int port,
+    public Server(Object instance, final Class<?>[] ifaces,
+                  Configuration conf, String bindAddress,  int port,
                   int numHandlers, boolean verbose) throws IOException {
       super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
       this.instance = instance;
       this.implementation = instance.getClass();
+
       this.verbose = verbose;
+
+      this.ifaces = ifaces;
+
+      // create metrics for the advertised interfaces this server implements.
+      this.rpcMetrics.createMetrics(this.ifaces);
     }
 
     @Override

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Fri Sep 24 20:48:55 2010
@@ -92,14 +92,26 @@ public class HBaseRpcMetrics implements 
     return new MetricsTimeVaryingRate(key, this.registry);
   }
 
-  public synchronized void inc(String name, int amt) {
+  public void inc(String name, int amt) {
     MetricsTimeVaryingRate m = get(name);
     if (m == null) {
-      m = create(name);
+      LOG.warn("Got inc() request for method that doesnt exist: " +
+      name);
+      return; // ignore methods that dont exist.
     }
     m.inc(amt);
   }
 
+  public void createMetrics(Class<?> []ifaces) {
+    for (Class<?> iface : ifaces) {
+      Method[] methods = iface.getMethods();
+      for (Method method : methods) {
+        if (get(method.getName()) == null)
+          create(method.getName());
+      }
+    }
+  }
+
   /**
    * Push the metrics to the monitoring subsystem on doUpdate() call.
    * @param context ctx

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Sep 24 20:48:55 2010
@@ -58,6 +58,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
@@ -129,6 +131,7 @@ public abstract class HBaseServer {
   protected String bindAddress;
   protected int port;                             // port we listen on
   private int handlerCount;                       // number of handler threads
+  private int readThreads;                        // number of read threads
   protected Class<? extends Writable> paramClass; // class of call parameters
   protected int maxIdleTime;                      // the maximum idle time after
                                                   // which a client may be
@@ -227,6 +230,8 @@ public abstract class HBaseServer {
 
     private ServerSocketChannel acceptChannel = null; //the accept channel
     private Selector selector = null; //the selector that we use for the server
+    private Reader[] readers = null;
+    private int currentReader = 0;
     private InetSocketAddress address; //the address we bind at
     private Random rand = new Random();
     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
@@ -235,6 +240,8 @@ public abstract class HBaseServer {
                                           //two cleanup runs
     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
 
+    private ExecutorService readPool;
+
     public Listener() throws IOException {
       address = new InetSocketAddress(bindAddress, port);
       // Create a new server socket and set to non blocking mode
@@ -247,11 +254,86 @@ public abstract class HBaseServer {
       // create a selector;
       selector= Selector.open();
 
+      readers = new Reader[readThreads];
+      readPool = Executors.newFixedThreadPool(readThreads);
+      for (int i = 0; i < readThreads; ++i) {
+        Selector readSelector = Selector.open();
+        Reader reader = new Reader(readSelector);
+        readers[i] = reader;
+        readPool.execute(reader);
+      }
+
       // Register accepts on the server socket with the selector.
       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
       this.setName("IPC Server listener on " + port);
       this.setDaemon(true);
     }
+
+
+    private class Reader implements Runnable {
+      private volatile boolean adding = false;
+      private Selector readSelector = null;
+
+      Reader(Selector readSelector) {
+        this.readSelector = readSelector;
+      }
+      public void run() {
+        LOG.info("Starting SocketReader");
+        synchronized(this) {
+          while (running) {
+            SelectionKey key = null;
+            try {
+              readSelector.select();
+              while (adding) {
+                this.wait(1000);
+              }
+
+              Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
+              while (iter.hasNext()) {
+                key = iter.next();
+                iter.remove();
+                if (key.isValid()) {
+                  if (key.isReadable()) {
+                    doRead(key);
+                  }
+                }
+                key = null;
+              }
+            } catch (InterruptedException e) {
+              if (running) {                     // unexpected -- log it
+                LOG.info(getName() + "caught: " +
+                    StringUtils.stringifyException(e));
+              }
+            } catch (IOException ex) {
+               LOG.error("Error in Reader", ex);
+            }
+          }
+        }
+      }
+
+      /**
+       * This gets reader into the state that waits for the new channel
+       * to be registered with readSelector. If it was waiting in select()
+       * the thread will be woken up, otherwise whenever select() is called
+       * it will return even if there is nothing to read and wait
+       * in while(adding) for finishAdd call
+       */
+      public void startAdd() {
+        adding = true;
+        readSelector.wakeup();
+      }
+
+      public synchronized SelectionKey registerChannel(SocketChannel channel)
+        throws IOException {
+        return channel.register(readSelector, SelectionKey.OP_READ);
+      }
+
+      public synchronized void finishAdd() {
+        adding = false;
+        this.notify();
+      }
+    }
+
     /** cleanup connections from connectionList. Choose a random range
      * to scan and also have a limit on the number of the connections
      * that will be cleanedup per run. The criteria for cleanup is the time
@@ -319,8 +401,6 @@ public abstract class HBaseServer {
               if (key.isValid()) {
                 if (key.isAcceptable())
                   doAccept(key);
-                else if (key.isReadable())
-                  doRead(key);
               }
             } catch (IOException ignored) {
             }
@@ -343,11 +423,6 @@ public abstract class HBaseServer {
             cleanupConnections(true);
             try { Thread.sleep(60000); } catch (Exception ignored) {}
       }
-        } catch (InterruptedException e) {
-          if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " caught: " +
-                     StringUtils.stringifyException(e));
-          }
         } catch (Exception e) {
           closeCurrentConnection(key);
         }
@@ -389,15 +464,17 @@ public abstract class HBaseServer {
     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
       Connection c;
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
-      // accept up to 10 connections
-      for (int i=0; i<10; i++) {
-        SocketChannel channel = server.accept();
-        if (channel==null) return;
 
+      SocketChannel channel;
+      while ((channel = server.accept()) != null) {
         channel.configureBlocking(false);
         channel.socket().setTcpNoDelay(tcpNoDelay);
         channel.socket().setKeepAlive(tcpKeepAlive);
-        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
+
+        Reader reader = getReader();
+        try {
+          reader.startAdd();
+          SelectionKey readKey = reader.registerChannel(channel);
         c = new Connection(channel, System.currentTimeMillis());
         readKey.attach(c);
         synchronized (connectionList) {
@@ -408,6 +485,9 @@ public abstract class HBaseServer {
           LOG.debug("Server connection from " + c.toString() +
               "; # active connections: " + numConnections +
               "; # queued calls: " + callQueue.size());
+        } finally {
+          reader.finishAdd();
+        }
       }
     }
 
@@ -452,6 +532,14 @@ public abstract class HBaseServer {
           LOG.info(getName() + ":Exception in closing listener socket. " + e);
         }
       }
+      readPool.shutdownNow();
+    }
+
+    // The method that will return the next reader to work with
+    // Simplistic implementation of round robin for now
+    Reader getReader() {
+      currentReader = (currentReader + 1) % readers.length;
+      return readers[currentReader];
     }
   }
 
@@ -993,6 +1081,9 @@ public abstract class HBaseServer {
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+     this.readThreads = conf.getInt(
+        "ipc.server.read.threadpool.size",
+        10);
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);
     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri Sep 24 20:48:55 2010
@@ -72,8 +72,6 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
-import org.apache.hadoop.hbase.executor.HBaseExecutorService;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
@@ -186,9 +184,11 @@ public class HMaster extends Thread impl
     // Get my address and create an rpc server instance.  The rpc-server port
     // can be ephemeral...ensure we have the correct info
     HServerAddress a = new HServerAddress(getMyAddress(this.conf));
-    this.rpcServer = HBaseRPC.getServer(this, a.getBindAddress(),
-      a.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
-      false, conf);
+    this.rpcServer = HBaseRPC.getServer(this,
+      new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
+        a.getBindAddress(), a.getPort(),
+        conf.getInt("hbase.regionserver.handler.count", 10), false, conf);
+
     this.address = new HServerAddress(this.rpcServer.getListenerAddress());
 
     this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
@@ -252,18 +252,6 @@ public class HMaster extends Thread impl
       new RegionServerOperationQueue(this.conf, this.closed);
 
     serverManager = new ServerManager(this);
-
-    
-    // Start the unassigned watcher - which will create the unassigned region 
-    // in ZK. This is needed before RegionManager() constructor tries to assign 
-    // the root region.
-    ZKUnassignedWatcher.start(this.conf, this);
-    // start the "close region" executor service
-    HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString());
-    // start the "open region" executor service
-    HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString());
-
-    
     // start the region manager
     regionManager = new RegionManager(this);
 
@@ -565,7 +553,6 @@ public class HMaster extends Thread impl
     this.rpcServer.stop();
     this.regionManager.stop();
     this.zooKeeperWrapper.close();
-    HBaseExecutorService.shutdown();
     LOG.info("HMaster main thread exiting");
   }
 

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Fri Sep 24 20:48:55 2010
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HServerIn
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 
 import java.io.IOException;
 
@@ -115,10 +114,6 @@ public class ProcessRegionOpen extends P
       } else {
         master.getRegionManager().removeRegion(regionInfo);
       }
-      ZooKeeperWrapper zkWrapper =
-          ZooKeeperWrapper.getInstance(master.getConfiguration(),
-              HMaster.class.getName());
-      zkWrapper.deleteUnassignedRegion(regionInfo.getEncodedName());
       return true;
     }
   }

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Fri Sep 24 20:48:55 2010
@@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.HServerAd
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HServerLoad;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -96,9 +94,6 @@ public class RegionManager {
    */
    final SortedMap<String, RegionState> regionsInTransition =
     Collections.synchronizedSortedMap(new TreeMap<String, RegionState>());
-   
-   // regions in transition are also recorded in ZK using the zk wrapper
-   final ZooKeeperWrapper zkWrapper;
 
   // How many regions to assign a server at a time.
   private final int maxAssignInOneGo;
@@ -129,12 +124,10 @@ public class RegionManager {
   private final int zooKeeperNumRetries;
   private final int zooKeeperPause;
 
-  RegionManager(HMaster master) throws IOException {
+  RegionManager(HMaster master) {
     Configuration conf = master.getConfiguration();
 
     this.master = master;
-    this.zkWrapper =
-        ZooKeeperWrapper.getInstance(conf, HMaster.class.getName());
     this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10);
     this.loadBalancer = new LoadBalancer(conf);
 
@@ -172,18 +165,10 @@ public class RegionManager {
     unsetRootRegion();
     if (!master.getShutdownRequested().get()) {
       synchronized (regionsInTransition) {
-        String regionName = HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString();
-        byte[] data = null;
-        try {
-          data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
-        } catch (IOException e) {
-          LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
-        }
-        zkWrapper.createOrUpdateUnassignedRegion(
-            HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data);
-        LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
-        RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, RegionState.State.UNASSIGNED);
-        regionsInTransition.put(regionName, s);
+        RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO,
+            RegionState.State.UNASSIGNED);
+        regionsInTransition.put(
+            HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(), s);
         LOG.info("ROOT inserted into regionsInTransition");
       }
     }
@@ -338,15 +323,6 @@ public class RegionManager {
     LOG.info("Assigning region " + regionName + " to " + sinfo.getServerName());
     rs.setPendingOpen(sinfo.getServerName());
     synchronized (this.regionsInTransition) {
-      byte[] data = null;
-      try {
-        data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
-      } catch (IOException e) {
-        LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
-      }
-      zkWrapper.createOrUpdateUnassignedRegion(
-          rs.getRegionInfo().getEncodedName(), data);
-      LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
       this.regionsInTransition.put(regionName, rs);
     }
 
@@ -987,17 +963,6 @@ public class RegionManager {
     synchronized(this.regionsInTransition) {
       s = regionsInTransition.get(info.getRegionNameAsString());
       if (s == null) {
-        byte[] data = null;
-        try {
-          data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
-        } catch (IOException e) {
-          // TODO: Review what we should do here.  If Writables work this
-          //       should never happen
-          LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
-        }
-        zkWrapper.createOrUpdateUnassignedRegion(info.getEncodedName(), data);          
-        LOG.debug("Created/updated UNASSIGNED zNode " + info.getRegionNameAsString() + 
-                  " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
         s = new RegionState(info, RegionState.State.UNASSIGNED);
         regionsInTransition.put(info.getRegionNameAsString(), s);
       }
@@ -1243,8 +1208,6 @@ public class RegionManager {
   public void setRootRegionLocation(HServerAddress address) {
     writeRootRegionLocationToZooKeeper(address);
     synchronized (rootRegionLocation) {
-      // the root region has been assigned, remove it from transition in ZK
-      zkWrapper.deleteUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName());
       rootRegionLocation.set(new HServerAddress(address));
       rootRegionLocation.notifyAll();
     }

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java Fri Sep 24 20:48:55 2010
@@ -74,7 +74,7 @@ class ZKMasterAddressWatcher implements 
     } else if(type.equals(EventType.NodeCreated) &&
         event.getPath().equals(this.zookeeper.clusterStateZNode)) {
       LOG.debug("Resetting watch on cluster state node.");
-      this.zookeeper.setClusterStateWatch();
+      this.zookeeper.setClusterStateWatch(this);
     }
   }
 
@@ -87,7 +87,7 @@ class ZKMasterAddressWatcher implements 
       try {
         LOG.debug("Waiting for master address ZNode to be deleted " +
           "(Also watching cluster state node)");
-        this.zookeeper.setClusterStateWatch();
+        this.zookeeper.setClusterStateWatch(this);
         wait();
       } catch (InterruptedException e) {
       }
@@ -110,7 +110,7 @@ class ZKMasterAddressWatcher implements 
       }
       if(this.zookeeper.writeMasterAddress(address)) {
         this.zookeeper.setClusterState(true);
-        this.zookeeper.setClusterStateWatch();
+        this.zookeeper.setClusterStateWatch(this);
         // Watch our own node
         this.zookeeper.readMasterAddress(this);
         return true;

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java Fri Sep 24 20:48:55 2010
@@ -1,185 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
-import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler;
-import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper.ZNodePathAndData;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-/**
- * Watches the UNASSIGNED znode in ZK for the master, and handles all events 
- * relating to region transitions.
- */
-public class ZKUnassignedWatcher implements Watcher {
-  private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class);
-
-  private ZooKeeperWrapper zkWrapper;
-  String serverName;
-  ServerManager serverManager;
-
-  public static void start(Configuration conf, HMaster master) 
-  throws IOException {
-    new ZKUnassignedWatcher(conf, master);
-    LOG.debug("Started ZKUnassigned watcher");
-  }
-
-  public ZKUnassignedWatcher(Configuration conf, HMaster master) 
-  throws IOException {
-    this.serverName = master.getHServerAddress().toString();
-    this.serverManager = master.getServerManager();
-    zkWrapper = ZooKeeperWrapper.getInstance(conf, HMaster.class.getName());
-    String unassignedZNode = zkWrapper.getRegionInTransitionZNode();
-    
-    // If the UNASSIGNED ZNode exists and this is a fresh cluster start, then 
-    // delete it.
-    if(master.isClusterStartup() && zkWrapper.exists(unassignedZNode, false)) {
-      LOG.info("Cluster start, but found " + unassignedZNode + ", deleting it.");
-      try {
-        zkWrapper.deleteZNode(unassignedZNode, true);
-      } catch (KeeperException e) {
-        LOG.error("Could not delete znode " + unassignedZNode, e);
-        throw new IOException(e);
-      } catch (InterruptedException e) {
-        LOG.error("Could not delete znode " + unassignedZNode, e);
-        throw new IOException(e);
-      }
-    }
-    
-    // If the UNASSIGNED ZNode does not exist, create it.
-    zkWrapper.createZNodeIfNotExists(unassignedZNode);
-    
-    // TODO: get the outstanding changes in UNASSIGNED
-
-    // Set a watch on Zookeeper's UNASSIGNED node if it exists.
-    zkWrapper.registerListener(this);
-  }
-
-  /**
-   * This is the processing loop that gets triggered from the ZooKeeperWrapper.
-   * This zookeeper events process function dies the following:
-   *   - WATCHES the following events: NodeCreated, NodeDataChanged, NodeChildrenChanged
-   *   - IGNORES the following events: None, NodeDeleted
-   */
-  @Override
-  public synchronized void process(WatchedEvent event) {
-    EventType type = event.getType();
-    LOG.debug("ZK-EVENT-PROCESS: Got zkEvent " + type +
-              " state:" + event.getState() +
-              " path:" + event.getPath());
-
-    // Handle the ignored events
-    if(type.equals(EventType.None)       ||
-       type.equals(EventType.NodeDeleted)) {
-      return;
-    }
-
-    // check if the path is for the UNASSIGNED directory we care about
-    if(event.getPath() == null ||
-       !event.getPath().startsWith(zkWrapper.getZNodePathForHBase(
-           zkWrapper.getRegionInTransitionZNode()))) {
-      return;
-    }
-
-    try
-    {
-      /*
-       * If a node is created in the UNASSIGNED directory in zookeeper, then:
-       *   1. watch its updates (this is an unassigned region).
-       *   2. read to see what its state is and handle as needed (state may have
-       *      changed before we started watching it)
-       */
-      if(type.equals(EventType.NodeCreated)) {
-        zkWrapper.watchZNode(event.getPath());
-        handleRegionStateInZK(event.getPath());
-      }
-      /*
-       * Data on some node has changed. Read to see what the state is and handle
-       * as needed.
-       */
-      else if(type.equals(EventType.NodeDataChanged)) {
-        handleRegionStateInZK(event.getPath());
-      }
-      /*
-       * If there were some nodes created then watch those nodes
-       */
-      else if(type.equals(EventType.NodeChildrenChanged)) {
-        List<ZNodePathAndData> newZNodes =
-            zkWrapper.watchAndGetNewChildren(event.getPath());
-        for(ZNodePathAndData zNodePathAndData : newZNodes) {
-          LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath());
-          handleRegionStateInZK(zNodePathAndData.getzNodePath(),
-              zNodePathAndData.getData());
-        }
-      }
-    }
-    catch (IOException e)
-    {
-      LOG.error("Could not process event from ZooKeeper", e);
-    }
-  }
-
-  /**
-   * Read the state of a node in ZK, and do the needful. We want to do the
-   * following:
-   *   1. If region's state is updated as CLOSED, invoke the ClosedRegionHandler.
-   *   2. If region's state is updated as OPENED, invoke the OpenRegionHandler.
-   * @param zNodePath
-   * @throws IOException
-   */
-  private void handleRegionStateInZK(String zNodePath) throws IOException {
-    byte[] data = zkWrapper.readZNode(zNodePath, null);
-    handleRegionStateInZK(zNodePath, data);
-  }
-  
-  private void handleRegionStateInZK(String zNodePath, byte[] data) {
-    // a null value is set when a node is created, we don't need to handle this
-    if(data == null) {
-      return;
-    }
-    String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode();
-    String region = zNodePath.substring(
-        zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1);
-    HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
-    LOG.debug("Got event type [ " + rsEvent + " ] for region " + region);
-
-    // if the node was CLOSED then handle it
-    if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) {
-      new MasterCloseRegionHandler(rsEvent, serverManager, serverName, region, data).submit();
-    }
-    // if the region was OPENED then handle that
-    else if(rsEvent == HBaseEventType.RS2ZK_REGION_OPENED || 
-            rsEvent == HBaseEventType.RS2ZK_REGION_OPENING) {
-      new MasterOpenRegionHandler(rsEvent, serverManager, serverName, region, data).submit();
-    }
-  }
-}
-

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java Fri Sep 24 20:48:55 2010
@@ -1,94 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.handler;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.util.Writables;
-
-/**
- * This is the event handler for all events relating to closing regions on the
- * HMaster. The following event types map to this handler:
- *   - RS_REGION_CLOSING
- *   - RS_REGION_CLOSED
- */
-public class MasterCloseRegionHandler extends HBaseEventHandler
-{
-  private static final Log LOG = LogFactory.getLog(MasterCloseRegionHandler.class);
-  
-  private String regionName;
-  protected byte[] serializedData;
-  RegionTransitionEventData hbEventData;
-  ServerManager serverManager;
-  
-  public MasterCloseRegionHandler(HBaseEventType eventType, 
-                                  ServerManager serverManager, 
-                                  String serverName, 
-                                  String regionName, 
-                                  byte[] serializedData) {
-    super(false, serverName, eventType);
-    this.regionName = regionName;
-    this.serializedData = serializedData;
-    this.serverManager = serverManager;
-  }
-
-  /**
-   * Handle the various events relating to closing regions. We can get the 
-   * following events here:
-   *   - RS_REGION_CLOSING : No-op
-   *   - RS_REGION_CLOSED  : The region is closed. If we are not in a shutdown 
-   *                         state, find the RS to open this region. This could 
-   *                         be a part of a region move, or just that the RS has 
-   *                         died. Should result in a M_REQUEST_OPENREGION event 
-   *                         getting created.
-   */
-  @Override
-  public void process()
-  {
-    LOG.debug("Event = " + getHBEvent() + ", region = " + regionName);
-    // handle RS_REGION_CLOSED events
-    handleRegionClosedEvent();
-  }
-  
-  private void handleRegionClosedEvent() {
-    try {
-      if(hbEventData == null) {
-        hbEventData = new RegionTransitionEventData();
-        Writables.getWritable(serializedData, hbEventData);
-      }
-    } catch (IOException e) {
-      LOG.error("Could not deserialize additional args for Close region", e);
-    }
-    // process the region close - this will cause the reopening of the 
-    // region as a part of the heartbeat of some RS
-    serverManager.processRegionClose(hbEventData.getHmsg().getRegionInfo());
-    LOG.info("Processed close of region " + hbEventData.getHmsg().getRegionInfo().getRegionNameAsString());
-  }
-  
-  public String getRegionName() {
-    return regionName;
-  }
-}

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java Fri Sep 24 20:48:55 2010
@@ -1,112 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.handler;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.util.Writables;
-
-/**
- * This is the event handler for all events relating to opening regions on the
- * HMaster. This could be one of the following:
- *   - notification that a region server is "OPENING" a region
- *   - notification that a region server has "OPENED" a region
- * The following event types map to this handler:
- *   - RS_REGION_OPENING
- *   - RS_REGION_OPENED
- */
-public class MasterOpenRegionHandler extends HBaseEventHandler {
-  private static final Log LOG = LogFactory.getLog(MasterOpenRegionHandler.class);
-  // other args passed in a byte array form
-  protected byte[] serializedData;
-  private String regionName;
-  private RegionTransitionEventData hbEventData;
-  ServerManager serverManager;
-
-  public MasterOpenRegionHandler(HBaseEventType eventType, 
-                                 ServerManager serverManager, 
-                                 String serverName, 
-                                 String regionName, 
-                                 byte[] serData) {
-    super(false, serverName, eventType);
-    this.regionName = regionName;
-    this.serializedData = serData;
-    this.serverManager = serverManager;
-  }
-
-  /**
-   * Handle the various events relating to opening regions. We can get the 
-   * following events here:
-   *   - RS_REGION_OPENING : Keep track to see how long the region open takes. 
-   *                         If the RS is taking too long, then revert the 
-   *                         region back to closed state so that it can be 
-   *                         re-assigned.
-   *   - RS_REGION_OPENED  : The region is opened. Add an entry into META for  
-   *                         the RS having opened this region. Then delete this 
-   *                         entry in ZK.
-   */
-  @Override
-  public void process()
-  {
-    LOG.debug("Event = " + getHBEvent() + ", region = " + regionName);
-    if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENING) {
-      handleRegionOpeningEvent();
-    }
-    else if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENED) {
-      handleRegionOpenedEvent();
-    }
-  }
-  
-  private void handleRegionOpeningEvent() {
-    // TODO: not implemented. 
-    LOG.debug("NO-OP call to handling region opening event");
-    // Keep track to see how long the region open takes. If the RS is taking too 
-    // long, then revert the region back to closed state so that it can be 
-    // re-assigned.
-  }
-
-  private void handleRegionOpenedEvent() {
-    try {
-      if(hbEventData == null) {
-        hbEventData = new RegionTransitionEventData();
-        Writables.getWritable(serializedData, hbEventData);
-      }
-    } catch (IOException e) {
-      LOG.error("Could not deserialize additional args for Open region", e);
-    }
-    LOG.debug("RS " + hbEventData.getRsName() + " has opened region " + regionName);
-    HServerInfo serverInfo = serverManager.getServerInfo(hbEventData.getRsName());
-    ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
-    serverManager.processRegionOpen(serverInfo, hbEventData.getHmsg().getRegionInfo(), returnMsgs);
-    if(returnMsgs.size() > 0) {
-      LOG.error("Open region tried to send message: " + returnMsgs.get(0).getType() + 
-                " about " + returnMsgs.get(0).getRegionInfo().getRegionNameAsString());
-    }
-  }
-}

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Sep 24 20:48:55 2010
@@ -307,9 +307,11 @@ public class HRegionServer implements HR
     this.stopRequested.set(false);
 
     // Server to handle client requests
-    this.server = HBaseRPC.getServer(this, address.getBindAddress(),
-      address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
-      false, conf);
+    this.server = HBaseRPC.getServer(this,
+        new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
+        OnlineRegions.class}, address.getBindAddress(),
+        address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+        false, conf);
     this.server.setErrorHandler(this);
     // Address is giving a default IP for the moment. Will be changed after
     // calling the master.
@@ -728,15 +730,15 @@ public class HRegionServer implements HR
           this.serverInfo.getServerAddress() + ", Now=" + hra);
         this.serverInfo.setServerAddress(hsa);
       }
-      
-      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this 
+
+      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
       // config param for task trackers, but we can piggyback off of it.
       if (this.conf.get("mapred.task.id") == null) {
         this.conf.set("mapred.task.id", 
             "hb_rs_" + this.serverInfo.getServerName() + "_" +
             System.currentTimeMillis());
       }
-      
+
       // Master sent us hbase.rootdir to use. Should be fully qualified
       // path with file system specification included.  Set 'fs.defaultFS'
       // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
@@ -1259,7 +1261,14 @@ public class HRegionServer implements HR
         if (LOG.isDebugEnabled())
           LOG.debug("sending initial server load: " + hsl);
         lastMsg = System.currentTimeMillis();
-        zooKeeperWrapper.writeRSLocation(this.serverInfo);
+        boolean startCodeOk = false;
+        while(!startCodeOk) {
+          this.serverInfo = createServerInfoWithNewStartCode(this.serverInfo);
+          startCodeOk = zooKeeperWrapper.writeRSLocation(this.serverInfo);
+          if(!startCodeOk) {
+           LOG.debug("Start code already taken, trying another one");
+          }
+        }
         result = this.hbaseMaster.regionServerStartup(this.serverInfo);
         break;
       } catch (IOException e) {
@@ -1270,6 +1279,26 @@ public class HRegionServer implements HR
     return result;
   }
 
+  private HServerInfo createServerInfoWithNewStartCode(final HServerInfo hsi) {
+    return new HServerInfo(hsi.getServerAddress(), hsi.getInfoPort(),
+      hsi.getHostname());
+  }
+
+  /* Add to the outbound message buffer */
+  private void reportOpen(HRegionInfo region) {
+    this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
+  }
+
+  /* Add to the outbound message buffer */
+  private void reportClose(HRegionInfo region) {
+    reportClose(region, null);
+  }
+
+  /* Add to the outbound message buffer */
+  private void reportClose(final HRegionInfo region, final byte[] message) {
+    this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
+  }
+
   /**
    * Add to the outbound message buffer
    *
@@ -1434,12 +1463,8 @@ public class HRegionServer implements HR
   void openRegion(final HRegionInfo regionInfo) {
     Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
     HRegion region = this.onlineRegions.get(mapKey);
-    RSZookeeperUpdater zkUpdater = 
-      new RSZookeeperUpdater(conf, serverInfo.getServerName(),
-          regionInfo.getEncodedName());
     if (region == null) {
       try {
-        zkUpdater.startRegionOpenEvent(null, true);
         region = instantiateRegion(regionInfo, this.hlog);
         // Startup a compaction early if one is needed, if region has references
         // or if a store has too many store files
@@ -1454,25 +1479,12 @@ public class HRegionServer implements HR
         // TODO: add an extra field in HRegionInfo to indicate that there is
         // an error. We can't do that now because that would be an incompatible
         // change that would require a migration
-        try {
-          HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, 
-                               regionInfo, 
-                               StringUtils.stringifyException(t).getBytes());
-          zkUpdater.abortOpenRegion(hmsg);
-        } catch (IOException e1) {
-          // TODO: Can we recover? Should be throw RTE?
-          LOG.error("Failed to abort open region " + regionInfo.getRegionNameAsString(), e1);
-        }
+        reportClose(regionInfo, StringUtils.stringifyException(t).getBytes());
         return;
       }
       addToOnlineRegions(region);
     }
-    try {
-      HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo);
-      zkUpdater.finishRegionOpenEvent(hmsg);
-    } catch (IOException e) {
-      LOG.error("Failed to mark region " + regionInfo.getRegionNameAsString() + " as opened", e);
-    }
+    reportOpen(regionInfo);
   }
 
   /*
@@ -1511,20 +1523,11 @@ public class HRegionServer implements HR
 
   protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
   throws IOException {
-    RSZookeeperUpdater zkUpdater = null;
-    if(reportWhenCompleted) {
-      zkUpdater = new RSZookeeperUpdater(conf,
-          serverInfo.getServerName(), hri.getEncodedName());
-      zkUpdater.startRegionCloseEvent(null, false);
-    }
     HRegion region = this.removeFromOnlineRegions(hri);
     if (region != null) {
       region.close();
       if(reportWhenCompleted) {
-        if(zkUpdater != null) {
-          HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, hri, null);
-          zkUpdater.finishRegionCloseEvent(hmsg);
-        }
+        reportClose(hri);
       }
     }
   }
@@ -2549,4 +2552,4 @@ public class HRegionServer implements HR
   public int getNumberOfOnlineRegions() {
     return onlineRegions.size();
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Sep 24 20:48:55 2010
@@ -397,7 +397,7 @@ public class MemStore implements HeapSiz
         KeyValue kv = it.next();
 
         // if this isnt the row we are interested in, then bail:
-        if (!firstKv.matchingRow(kv)) {
+        if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv) ) {
           break; // rows dont match, bail.
         }
 
@@ -430,7 +430,7 @@ public class MemStore implements HeapSiz
         }
 
         // if this isnt the row we are interested in, then bail:
-        if (!firstKv.matchingRow(kv)) {
+        if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv)) {
           break; // rows dont match, bail.
         }
 
@@ -439,7 +439,7 @@ public class MemStore implements HeapSiz
           // to be extra safe we only remove Puts that have a memstoreTS==0
           if (kv.getType() == KeyValue.Type.Put.getCode()) {
             // false means there was a change, so give us the size.
-            addedSize -= heapSizeChange(kv, false);
+            addedSize -= heapSizeChange(kv, true);
 
             it.remove();
           }



Mime
View raw message