hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r1001066 [2/2] - in /hbase/branches/0.89.20100924: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/executor/ src/main/java/org/apache/hadoop/hbase/io/hfile/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main...
Date Fri, 24 Sep 2010 20:48:56 GMT
Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java Fri Sep 24 20:48:55 2010
@@ -1,164 +0,0 @@
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * This is a helper class for region servers to update various states in 
- * Zookeeper. The various updates are abstracted out here. 
- * 
- * The "startRegionXXX" methods are to be called first, followed by the 
- * "finishRegionXXX" methods. Supports updating zookeeper periodically as a 
- * part of the "startRegionXXX". Currently handles the following state updates:
- *   - Close region
- *   - Open region
- */
-// TODO: make this thread local, in which case it is re-usable per thread
-public class RSZookeeperUpdater {
-  private static final Log LOG = LogFactory.getLog(RSZookeeperUpdater.class);
-  private final String regionServerName;
-  private String regionName = null;
-  private String regionZNode = null;
-  private ZooKeeperWrapper zkWrapper = null;
-  private int zkVersion = 0;
-  HBaseEventType lastUpdatedState;
-
-  public RSZookeeperUpdater(Configuration conf,
-                            String regionServerName, String regionName) {
-    this(conf, regionServerName, regionName, 0);
-  }
-  
-  public RSZookeeperUpdater(Configuration conf, String regionServerName,
-                            String regionName, int zkVersion) {
-    this.zkWrapper = ZooKeeperWrapper.getInstance(conf, regionServerName);
-    this.regionServerName = regionServerName;
-    this.regionName = regionName;
-    // get the region ZNode we have to create
-    this.regionZNode = zkWrapper.getZNode(zkWrapper.getRegionInTransitionZNode(), regionName);
-    this.zkVersion = zkVersion;
-  }
-  
-  /**
-   * This method updates the various states in ZK to inform the master that the 
-   * region server has started closing the region.
-   * @param updatePeriodically - if true, periodically updates the state in ZK
-   */
-  public void startRegionCloseEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
-    // if this ZNode already exists, something is wrong
-    if(zkWrapper.exists(regionZNode, true)) {
-      String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region.";
-      LOG.error(msg);
-      throw new IOException(msg);
-    }
-    
-    // create the region node in the unassigned directory first
-    zkWrapper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true);
-
-    // update the data for "regionName" ZNode in unassigned to CLOSING
-    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg);
-    
-    // TODO: implement the updatePeriodically logic here
-  }
-
-  /**
-   * This method updates the states in ZK to signal that the region has been 
-   * closed. This will stop the periodic updater thread if one was started.
-   * @throws IOException
-   */
-  public void finishRegionCloseEvent(HMsg hmsg) throws IOException {    
-    // TODO: stop the updatePeriodically here
-
-    // update the data for "regionName" ZNode in unassigned to CLOSED
-    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg);
-  }
-  
-  /**
-   * This method updates the various states in ZK to inform the master that the 
-   * region server has started opening the region.
-   * @param updatePeriodically - if true, periodically updates the state in ZK
-   */
-  public void startRegionOpenEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
-    Stat stat = new Stat();
-    byte[] data = zkWrapper.readZNode(regionZNode, stat);
-    // if there is no ZNode for this region, something is wrong
-    if(data == null) {
-      String msg = "ZNode " + regionZNode + " does not exist in ZooKeeper, will NOT open region.";
-      LOG.error(msg);
-      throw new IOException(msg);
-    }
-    // if the ZNode is not in the closed state, something is wrong
-    HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
-    if(rsEvent != HBaseEventType.RS2ZK_REGION_CLOSED && rsEvent != HBaseEventType.M2ZK_REGION_OFFLINE) {
-      String msg = "ZNode " + regionZNode + " is not in CLOSED/OFFLINE state (state = " + rsEvent + "), will NOT open region.";
-      LOG.error(msg);
-      throw new IOException(msg);
-    }
-
-    // get the version to update from ZK
-    zkVersion = stat.getVersion();
-
-    // update the data for "regionName" ZNode in unassigned to CLOSING
-    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENING, hmsg);
-    
-    // TODO: implement the updatePeriodically logic here
-  }
-  
-  /**
-   * This method updates the states in ZK to signal that the region has been 
-   * opened. This will stop the periodic updater thread if one was started.
-   * @throws IOException
-   */
-  public void finishRegionOpenEvent(HMsg hmsg) throws IOException {
-    // TODO: stop the updatePeriodically here
-
-    // update the data for "regionName" ZNode in unassigned to CLOSED
-    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENED, hmsg);
-  }
-  
-  public boolean isClosingRegion() {
-    return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_CLOSING);
-  }
-
-  public boolean isOpeningRegion() {
-    return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_OPENING);
-  }
-
-  public void abortOpenRegion(HMsg hmsg) throws IOException {
-    LOG.error("Aborting open of region " + regionName);
-
-    // TODO: stop the updatePeriodically for start open region here
-
-    // update the data for "regionName" ZNode in unassigned to CLOSED
-    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg);
-  }
-
-  private void updateZKWithEventData(HBaseEventType hbEventType, HMsg hmsg) throws IOException {
-    // update the data for "regionName" ZNode in unassigned to "hbEventType"
-    byte[] data = null;
-    try {
-      data = Writables.getBytes(new RegionTransitionEventData(hbEventType, regionServerName, hmsg));
-    } catch (IOException e) {
-      LOG.error("Error creating event data for " + hbEventType, e);
-    }
-    LOG.debug("Updating ZNode " + regionZNode + 
-              " with [" + hbEventType + "]" +
-              " expected version = " + zkVersion);
-    lastUpdatedState = hbEventType;
-    zkWrapper.writeZNode(regionZNode, data, zkVersion, true);
-    zkVersion++;
-  }
-}

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Sep 24 20:48:55 2010
@@ -635,7 +635,8 @@ public class Store implements HeapSize {
         // we'd end up with nothing to compact.  To protect against this, we'll
         // compact the tail -- up to the last 4 files -- of filesToCompact
         // regardless.
-        int tail = Math.min(countOfFiles, 4);
+        // BANDAID for HBASE-2990, setting to 2
+        int tail = Math.min(countOfFiles, 2);
         for (point = 0; point < (countOfFiles - tail); point++) {
           if (((fileSizes[point] < fileSizes[point + 1] * 2) &&
                (countOfFiles - point) <= maxFilesToCompact)) {

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Sep 24 20:48:55 2010
@@ -1322,7 +1322,11 @@ public class HLog implements Syncable {
             recoverFileLease(fs, logPath, conf);
             parseHLog(log, editsByRegion, fs, conf);
             processedLogs.add(logPath);
-           } catch (IOException e) {
+          } catch (EOFException eof) {
+            // truncated files are expected if a RS crashes (see HBASE-2643)
+            LOG.info("EOF from hlog " + logPath + ".  continuing");
+            processedLogs.add(logPath);
+          } catch (IOException e) {
              if (skipErrors) {
                LOG.warn("Got while parsing hlog " + logPath +
                  ". Marking as corrupted", e);
@@ -1567,8 +1571,8 @@ public class HLog implements Syncable {
         queue.addLast(entry);
         editsCount++;
       }
-      LOG.debug("Pushed=" + editsCount + " entries from " + path);
     } finally {
+      LOG.debug("Pushed=" + editsCount + " entries from " + path);
       try {
         if (in != null) {
           in.close();

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Fri Sep 24 20:48:55 2010
@@ -21,7 +21,10 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
-
+import java.lang.Class;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+ 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -97,6 +100,7 @@ public class SequenceFileLogReader imple
   // Needed logging exceptions
   Path path;
   int edit = 0;
+  long entryStart = 0;
 
   public SequenceFileLogReader() { }
 
@@ -124,6 +128,7 @@ public class SequenceFileLogReader imple
 
   @Override
   public HLog.Entry next(HLog.Entry reuse) throws IOException {
+    this.entryStart = this.reader.getPosition();
     HLog.Entry e = reuse;
     if (e == null) {
       HLogKey key = HLog.newKey(conf);
@@ -162,7 +167,28 @@ public class SequenceFileLogReader imple
     } catch (IOException e) {
       Log.warn("Failed getting position to add to throw", e);
     }
-    return new IOException((this.path == null? "": this.path.toString()) +
-      ", pos=" + pos + ", edit=" + this.edit, ioe);
+
+    // See what SequenceFile.Reader thinks is the end of the file
+    long end = Long.MAX_VALUE;
+    try {
+      Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
+      fEnd.setAccessible(true);
+      end = fEnd.getLong(this.reader);
+    } catch(Exception e) { /* reflection fail. keep going */ }
+
+    String msg = (this.path == null? "": this.path.toString()) +
+      ", entryStart=" + entryStart + ", pos=" + pos + 
+      ((end == Long.MAX_VALUE) ? "" : ", end=" + end) + 
+      ", edit=" + this.edit;
+
+    // Enhance via reflection so we don't change the original class type
+    try {
+      return (IOException) ioe.getClass()
+        .getConstructor(String.class)
+        .newInstance(msg)
+        .initCause(ioe);
+    } catch(Exception e) { /* reflection fail. keep going */ }
+    
+    return ioe;
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Fri Sep 24 20:48:55 2010
@@ -72,7 +72,7 @@ public class Replication implements LogE
         conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
     if (replication) {
       this.zkHelper = new ReplicationZookeeperWrapper(
-        ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
+        ZooKeeperWrapper.createInstance(conf, hsi.getServerName()), conf,
         this.replicating, hsi.getServerName());
       this.replicationMaster = zkHelper.isReplicationMaster();
       this.replicationManager = this.replicationMaster ?

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Fri Sep 24 20:48:55 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.replicat
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Delete;
@@ -36,6 +35,8 @@ import org.apache.hadoop.hbase.util.Byte
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -87,7 +88,7 @@ public class ReplicationSink {
    * @param entries
    * @throws IOException
    */
-  public synchronized void replicateEntries(HLog.Entry[] entries)
+  public void replicateEntries(HLog.Entry[] entries)
       throws IOException {
     if (entries.length == 0) {
       return;
@@ -96,8 +97,9 @@ public class ReplicationSink {
     // to the same table.
     try {
       long totalReplicated = 0;
-      byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
-      List<Put> puts = new ArrayList<Put>();
+      // Map of table => list of puts, we only want to flushCommits once per
+      // invocation of this method per table.
+      Map<byte[], List<Put>> puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
       for (HLog.Entry entry : entries) {
         WALEdit edit = entry.getEdit();
         List<KeyValue> kvs = edit.getKeyValues();
@@ -114,9 +116,11 @@ public class ReplicationSink {
           }
           delete(entry.getKey().getTablename(), delete);
         } else {
-          // Switching table, flush
-          if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
-            put(lastTable, puts);
+          byte[] table = entry.getKey().getTablename();
+          List<Put> tableList = puts.get(table);
+          if (tableList == null) {
+            tableList = new ArrayList<Put>();
+            puts.put(table, tableList);
           }
           // With mini-batching, we need to expect multiple rows per edit
           byte[] lastKey = kvs.get(0).getRow();
@@ -124,18 +128,19 @@ public class ReplicationSink {
               kvs.get(0).getTimestamp());
           for (KeyValue kv : kvs) {
             if (!Bytes.equals(lastKey, kv.getRow())) {
-              puts.add(put);
+              tableList.add(put);
               put = new Put(kv.getRow(), kv.getTimestamp());
             }
             put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
             lastKey = kv.getRow();
           }
-          puts.add(put);
-          lastTable = entry.getKey().getTablename();
+          tableList.add(put);
         }
         totalReplicated++;
       }
-      put(lastTable, puts);
+      for(byte [] table : puts.keySet()) {
+        put(table, puts.get(table));
+      }
       this.metrics.setAgeOfLastAppliedOp(
           entries[entries.length-1].getKey().getWriteTime());
       this.metrics.appliedBatchesRate.inc(1);
@@ -174,8 +179,6 @@ public class ReplicationSink {
       table = this.pool.getTable(tableName);
       table.put(puts);
       this.metrics.appliedOpsRate.inc(puts.size());
-      this.pool.putTable(table);
-      puts.clear();
     } finally {
       if (table != null) {
         this.pool.putTable(table);
@@ -195,7 +198,6 @@ public class ReplicationSink {
       table = this.pool.getTable(tableName);
       table.delete(delete);
       this.metrics.appliedOpsRate.inc(1);
-      this.pool.putTable(table);
     } finally {
       if (table != null) {
         this.pool.putTable(table);

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Fri Sep 24 20:48:55 2010
@@ -286,10 +286,10 @@ public class ReplicationSourceManager im
     LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
     SortedMap<String, SortedSet<String>> newQueues =
         this.zkHelper.copyQueuesFromRS(rsZnode);
+    this.zkHelper.deleteRsQueues(rsZnode);
     if (newQueues == null || newQueues.size() == 0) {
       return;
     }
-    this.zkHelper.deleteRsQueues(rsZnode);
 
     for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
       String peerId = entry.getKey();

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java Fri Sep 24 20:48:55 2010
@@ -18,6 +18,9 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
 import org.apache.hadoop.hbase.metrics.MetricsRate;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -73,6 +76,11 @@ public class ReplicationSourceMetrics im
     metricsRecord = MetricsUtil.createRecord(context, "replication");
     metricsRecord.setTag("RegionServer", name);
     context.registerUpdater(this);
+    try {
+      id = URLEncoder.encode(id, "UTF8");
+    } catch (UnsupportedEncodingException e) {
+      id = "CAN'T ENCODE UTF8";
+    }
     // export for JMX
     new ReplicationStatistics(this.registry, "ReplicationSource for " + id);
   }

Modified: hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hbase/branches/0.89.20100924/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Fri Sep 24 20:48:55 2010
@@ -25,7 +25,6 @@ import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.Socket;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -41,7 +40,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -125,14 +123,6 @@ public class ZooKeeperWrapper implements
    * State of the cluster - if up and running or shutting down
    */
   public final String clusterStateZNode;
-  /*
-   * Regions that are in transition
-   */
-  private final String rgnsInTransitZNode;
-  /*
-   * List of ZNodes in the unassgined region that are already being watched
-   */
-  private Set<String> unassignedZNodesWatched = new HashSet<String>();
 
   private List<Watcher> listeners = new ArrayList<Watcher>();
 
@@ -196,7 +186,6 @@ public class ZooKeeperWrapper implements
 
     rootRegionZNode     = getZNode(parentZNode, rootServerZNodeName);
     rsZNode             = getZNode(parentZNode, rsZNodeName);
-    rgnsInTransitZNode  = getZNode(parentZNode, regionsInTransitZNodeName);
     masterElectionZNode = getZNode(parentZNode, masterAddressZNodeName);
     clusterStateZNode   = getZNode(parentZNode, stateZNodeName);
   }
@@ -402,9 +391,9 @@ public class ZooKeeperWrapper implements
    * Watch the state of the cluster, up or down
    * @param watcher Watcher to set on cluster state node
    */
-  public void setClusterStateWatch() {
+  public void setClusterStateWatch(Watcher watcher) {
     try {
-      zooKeeper.exists(clusterStateZNode, this);
+      zooKeeper.exists(clusterStateZNode, watcher == null ? this : watcher);
     } catch (InterruptedException e) {
       LOG.warn("<" + instanceName + ">" + "Failed to check on ZNode " + clusterStateZNode, e);
     } catch (KeeperException e) {
@@ -975,14 +964,6 @@ public class ZooKeeperWrapper implements
   }
 
   /**
-   * Get the znode that has all the regions in transition.
-   * @return path to znode
-   */
-  public String getRegionInTransitionZNode() {
-    return this.rgnsInTransitZNode;
-  }
-
-  /**
    * Get the path of this region server's znode
    * @return path to znode
    */
@@ -1080,191 +1061,6 @@ public class ZooKeeperWrapper implements
       }
     }
 
-  /**
-   * Given a region name and some data, this method creates a new the region
-   * znode data under the UNASSGINED znode with the data passed in. This method
-   * will not update data for existing znodes.
-   *
-   * @param regionName - encoded name of the region
-   * @param data - new serialized data to update the region znode
-   */
-  private void createUnassignedRegion(String regionName, byte[] data) {
-    String znode = getZNode(getRegionInTransitionZNode(), regionName);
-    if(LOG.isDebugEnabled()) {
-      // check if this node already exists -
-      //   - it should not exist
-      //   - if it does, it should be in the CLOSED state
-      if(exists(znode, true)) {
-        Stat stat = new Stat();
-        byte[] oldData = null;
-        try {
-          oldData = readZNode(znode, stat);
-        } catch (IOException e) {
-          LOG.error("Error reading data for " + znode);
-        }
-        if(oldData == null) {
-          LOG.debug("While creating UNASSIGNED region " + regionName + " exists with no data" );
-        }
-        else {
-          LOG.debug("While creating UNASSIGNED region " + regionName + " exists, state = " + (HBaseEventType.fromByte(oldData[0])));
-        }
-      }
-      else {
-        if(data == null) {
-          LOG.debug("Creating UNASSIGNED region " + regionName + " with no data" );
-        }
-        else {
-          LOG.debug("Creating UNASSIGNED region " + regionName + " in state = " + (HBaseEventType.fromByte(data[0])));
-        }
-      }
-    }
-    synchronized(unassignedZNodesWatched) {
-      unassignedZNodesWatched.add(znode);
-      createZNodeIfNotExists(znode, data, CreateMode.PERSISTENT, true);
-    }
-  }
-
-  /**
-   * Given a region name and some data, this method updates the region znode
-   * data under the UNASSGINED znode with the latest data. This method will
-   * update the znode data only if it already exists.
-   *
-   * @param regionName - encoded name of the region
-   * @param data - new serialized data to update the region znode
-   */
-  public void updateUnassignedRegion(String regionName, byte[] data) {
-    String znode = getZNode(getRegionInTransitionZNode(), regionName);
-    // this is an update - make sure the node already exists
-    if(!exists(znode, true)) {
-      LOG.error("Cannot update " + znode + " - node does not exist" );
-      return;
-    }
-
-    Stat stat = new Stat();
-    byte[] oldData = null;
-    try {
-      oldData = readZNode(znode, stat);
-    } catch (IOException e) {
-      LOG.error("Error reading data for " + znode);
-    }
-    // If there is no data in the ZNode, then update it
-    if(oldData == null) {
-      LOG.debug("While updating UNASSIGNED region " + regionName + " - node exists with no data" );
-    }
-    // If there is data in the ZNode, do not update if it is already correct
-    else {
-      HBaseEventType curState = HBaseEventType.fromByte(oldData[0]);
-      HBaseEventType newState = HBaseEventType.fromByte(data[0]);
-      // If the znode has the right state already, do not update it. Updating
-      // the znode again and again will bump up the zk version. This may cause
-      // the region server to fail. The RS expects that the znode is never
-      // updated by anyone else while it is opening/closing a region.
-      if(curState == newState) {
-        LOG.debug("No need to update UNASSIGNED region " + regionName +
-                  " as it already exists in state = " + curState);
-        return;
-      }
-
-      // If the ZNode is in another state, then update it
-      LOG.debug("UNASSIGNED region " + regionName + " is currently in state = " +
-                curState + ", updating it to " + newState);
-    }
-    // Update the ZNode
-    synchronized(unassignedZNodesWatched) {
-      unassignedZNodesWatched.add(znode);
-      try {
-        writeZNode(znode, data, -1, true);
-      } catch (IOException e) {
-        LOG.error("Error writing data for " + znode + ", could not update state to " + (HBaseEventType.fromByte(data[0])));
-      }
-    }
-  }
-
-  /**
-   * This method will create a new region in transition entry in ZK with the
-   * speficied data if none exists. If one already exists, it will update the
-   * data with whatever is passed in.
-   *
-   * @param regionName - encoded name of the region
-   * @param data - serialized data for the region znode
-   */
-  public void createOrUpdateUnassignedRegion(String regionName, byte[] data) {
-    String znode = getZNode(getRegionInTransitionZNode(), regionName);
-    if(exists(znode, true)) {
-      updateUnassignedRegion(regionName, data);
-    }
-    else {
-      createUnassignedRegion(regionName, data);
-    }
-  }
-
-  public void deleteUnassignedRegion(String regionName) {
-    String znode = getZNode(getRegionInTransitionZNode(), regionName);
-    try {
-      LOG.debug("Deleting ZNode " + znode + " in ZooKeeper as region is open...");
-      synchronized(unassignedZNodesWatched) {
-        unassignedZNodesWatched.remove(znode);
-        deleteZNode(znode);
-      }
-    } catch (KeeperException.SessionExpiredException e) {
-      LOG.error("Zookeeper session has expired", e);
-      // if the session has expired try to reconnect to ZK, then perform query
-      try {
-        // TODO: ZK-REFACTOR: should just quit on reconnect??
-        reconnectToZk();
-        synchronized(unassignedZNodesWatched) {
-          unassignedZNodesWatched.remove(znode);
-          deleteZNode(znode);
-        }
-      } catch (IOException e1) {
-        LOG.error("Error reconnecting to zookeeper", e1);
-        throw new RuntimeException("Error reconnecting to zookeeper", e1);
-      } catch (KeeperException.SessionExpiredException e1) {
-        LOG.error("Error reading after reconnecting to zookeeper", e1);
-        throw new RuntimeException("Error reading after reconnecting to zookeeper", e1);
-      } catch (KeeperException e1) {
-        LOG.error("Error reading after reconnecting to zookeeper", e1);
-      } catch (InterruptedException e1) {
-        LOG.error("Error reading after reconnecting to zookeeper", e1);
-      }
-    } catch (KeeperException e) {
-      LOG.error("Error deleting region " + regionName, e);
-    } catch (InterruptedException e) {
-      LOG.error("Error deleting region " + regionName, e);
-    }
-  }
-
-  /**
-   * Atomically adds a watch and reads data from the unwatched znodes in the
-   * UNASSGINED region. This works because the master is the only person
-   * deleting nodes.
-   * @param znode
-   * @return
-   */
-  public List<ZNodePathAndData> watchAndGetNewChildren(String znode) {
-    List<String> nodes = null;
-    List<ZNodePathAndData> newNodes = new ArrayList<ZNodePathAndData>();
-    try {
-      if (checkExistenceOf(znode)) {
-        synchronized(unassignedZNodesWatched) {
-          nodes = zooKeeper.getChildren(znode, this);
-          for (String node : nodes) {
-            String znodePath = joinPath(znode, node);
-            if(!unassignedZNodesWatched.contains(znodePath)) {
-              byte[] data = getDataAndWatch(znode, node, this);
-              newNodes.add(new ZNodePathAndData(znodePath, data));
-              unassignedZNodesWatched.add(znodePath);
-            }
-          }
-        }
-      }
-    } catch (KeeperException e) {
-      LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
-    } catch (InterruptedException e) {
-      LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
-    }
-    return newNodes;
-  }
 
   public static class ZNodePathAndData {
     private String zNodePath;

Modified: hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java (original)
+++ hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java Fri Sep 24 20:48:55 2010
@@ -30,9 +30,6 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventHandlerListener;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
@@ -89,7 +86,7 @@ public class TestMaster {
     CountDownLatch aboutToOpen = new CountDownLatch(1);
     CountDownLatch proceed = new CountDownLatch(1);
     RegionOpenListener list = new RegionOpenListener(aboutToOpen, proceed);
-    HBaseEventHandler.registerListener(list);
+    m.getRegionServerOperationQueue().registerRegionServerOperationListener(list);
 
     LOG.info("Splitting table");
     admin.split(TABLENAME);
@@ -112,7 +109,7 @@ public class TestMaster {
     }
   }
 
-  static class RegionOpenListener implements HBaseEventHandlerListener {
+  static class RegionOpenListener implements RegionServerOperationListener {
     CountDownLatch aboutToOpen, proceed;
 
     public RegionOpenListener(CountDownLatch aboutToOpen, CountDownLatch proceed)
@@ -122,9 +119,9 @@ public class TestMaster {
     }
 
     @Override
-    public void afterProcess(HBaseEventHandler event) {
-      if (event.getHBEvent() != HBaseEventType.RS2ZK_REGION_OPENED) {
-        return;
+    public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
+      if (!incomingMsg.isType(HMsg.Type.MSG_REPORT_OPEN)) {
+        return true;
       }
       try {
         aboutToOpen.countDown();
@@ -132,11 +129,16 @@ public class TestMaster {
       } catch (InterruptedException ie) {
         throw new RuntimeException(ie);
       }
-      return;
+      return true;
     }
 
     @Override
-    public void beforeProcess(HBaseEventHandler event) {
+    public boolean process(RegionServerOperation op) throws IOException {
+      return true;
+    }
+
+    @Override
+    public void processed(RegionServerOperation op) {
     }
   }
 

Modified: hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java (original)
+++ hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java Fri Sep 24 20:48:55 2010
@@ -1,90 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestRestartCluster {
-  private static final Log LOG = LogFactory.getLog(TestRestartCluster.class);
-  private static Configuration conf;
-  private static HBaseTestingUtility utility;
-  private static ZooKeeperWrapper zkWrapper;
-  private static final byte[] TABLENAME = Bytes.toBytes("master_transitions");
-  private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a")};
-  
-  @BeforeClass public static void beforeAllTests() throws Exception {
-    conf = HBaseConfiguration.create();
-    utility = new HBaseTestingUtility(conf);
-  }
-
-  @AfterClass public static void afterAllTests() throws IOException {
-    utility.shutdownMiniCluster();
-  }
-
-  @Before public void setup() throws IOException {
-  }
-
-  @Test (timeout=300000) public void testRestartClusterAfterKill()throws Exception {
-    utility.startMiniZKCluster();
-    zkWrapper = ZooKeeperWrapper.createInstance(conf, "cluster1");
-
-    // create the unassigned region, throw up a region opened state for META
-    String unassignedZNode = zkWrapper.getRegionInTransitionZNode();
-    zkWrapper.createZNodeIfNotExists(unassignedZNode);
-    byte[] data = null;
-    HBaseEventType hbEventType = HBaseEventType.RS2ZK_REGION_OPENED;
-    try {
-      data = Writables.getBytes(new RegionTransitionEventData(hbEventType, HMaster.MASTER));
-    } catch (IOException e) {
-      LOG.error("Error creating event data for " + hbEventType, e);
-    }
-    zkWrapper.createOrUpdateUnassignedRegion(
-        HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data);
-    zkWrapper.createOrUpdateUnassignedRegion(
-        HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(), data);
-    LOG.debug("Created UNASSIGNED zNode for ROOT and META regions in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
-    
-    // start the HB cluster
-    LOG.info("Starting HBase cluster...");
-    utility.startMiniCluster(2);  
-    
-    utility.createTable(TABLENAME, FAMILIES);
-    LOG.info("Created a table, waiting for table to be available...");
-    utility.waitTableAvailable(TABLENAME, 60*1000);
-
-    LOG.info("Master deleted unassgined region and started up successfully.");
-  }
-}

Modified: hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java (original)
+++ hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java Fri Sep 24 20:48:55 2010
@@ -1,241 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.ProcessRegionClose;
-import org.apache.hadoop.hbase.master.RegionServerOperation;
-import org.apache.hadoop.hbase.master.RegionServerOperationListener;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.Writables;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestZKBasedCloseRegion {
-  private static final Log LOG = LogFactory.getLog(TestZKBasedCloseRegion.class);
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static final String TABLENAME = "master_transitions";
-  private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
-    Bytes.toBytes("b"), Bytes.toBytes("c")};
-
-  @BeforeClass public static void beforeAllTests() throws Exception {
-    Configuration c = TEST_UTIL.getConfiguration();
-    c.setBoolean("dfs.support.append", true);
-    c.setInt("hbase.regionserver.info.port", 0);
-    c.setInt("hbase.master.meta.thread.rescanfrequency", 5*1000);
-    TEST_UTIL.startMiniCluster(2);
-    TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
-    HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
-    int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily());
-    waitUntilAllRegionsAssigned(countOfRegions);
-    addToEachStartKey(countOfRegions);
-  }
-
-  @AfterClass public static void afterAllTests() throws IOException {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Before public void setup() throws IOException {
-    if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) {
-      // Need at least two servers.
-      LOG.info("Started new server=" +
-        TEST_UTIL.getHBaseCluster().startRegionServer());
-      
-    }
-  }
-
-  @Test (timeout=300000) public void testCloseRegion()
-  throws Exception {
-    LOG.info("Running testCloseRegion");
-    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-    LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size());
-
-    int rsIdx = 0;
-    HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
-    Collection<HRegion> regions = regionServer.getOnlineRegions();
-    HRegion region = regions.iterator().next();
-    LOG.debug("Asking RS to close region " + region.getRegionNameAsString());
-
-    AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
-    RegionServerOperationListener listener = 
-      new CloseRegionEventListener(region.getRegionNameAsString(), closeEventProcessed);
-    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
-    master.getRegionServerOperationQueue().registerRegionServerOperationListener(listener);
-    HMsg closeRegionMsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, 
-                                   region.getRegionInfo(),
-                                   Bytes.toBytes("Forcing close in test")
-                                  );
-    TEST_UTIL.getHBaseCluster().addMessageToSendRegionServer(rsIdx, closeRegionMsg);
-    
-    synchronized(closeEventProcessed) {
-      // wait for 3 minutes
-      closeEventProcessed.wait(3*60*1000);
-    }
-    if(!closeEventProcessed.get()) {
-      throw new Exception("Timed out, close event not called on master.");
-    }
-    else {
-      LOG.info("Done with test, RS informed master successfully.");
-    }
-  }
-  
-  public static class CloseRegionEventListener implements RegionServerOperationListener {
-    
-    private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class);
-    String regionToClose;
-    AtomicBoolean closeEventProcessed;
-
-    public CloseRegionEventListener(String regionToClose, AtomicBoolean closeEventProcessed) {
-      this.regionToClose = regionToClose;
-      this.closeEventProcessed = closeEventProcessed;
-    }
-
-    @Override
-    public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
-      return true;
-    }
-
-    @Override
-    public boolean process(RegionServerOperation op) throws IOException {
-      return true;
-    }
-
-    @Override
-    public void processed(RegionServerOperation op) {
-      LOG.debug("Master processing object: " + op.getClass().getCanonicalName());
-      if(op instanceof ProcessRegionClose) {
-        ProcessRegionClose regionCloseOp = (ProcessRegionClose)op;
-        String region = regionCloseOp.getRegionInfo().getRegionNameAsString();
-        LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose);
-        if(regionToClose.equals(region)) {
-          closeEventProcessed.set(true);
-        }
-        synchronized(closeEventProcessed) {
-          closeEventProcessed.notifyAll();
-        }
-      }
-    }
-    
-  }
-  
-
-  private static void waitUntilAllRegionsAssigned(final int countOfRegions)
-  throws IOException {
-    HTable meta = new HTable(TEST_UTIL.getConfiguration(),
-      HConstants.META_TABLE_NAME);
-    while (true) {
-      int rows = 0;
-      Scan scan = new Scan();
-      scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
-      ResultScanner s = meta.getScanner(scan);
-      for (Result r = null; (r = s.next()) != null;) {
-        byte [] b =
-          r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
-        if (b == null || b.length <= 0) break;
-        rows++;
-      }
-      s.close();
-      // If I get to here and all rows have a Server, then all have been assigned.
-      if (rows == countOfRegions) break;
-      LOG.info("Found=" + rows);
-      Threads.sleep(1000); 
-    }
-  }
-
-  /*
-   * Add to each of the regions in .META. a value.  Key is the startrow of the
-   * region (except its 'aaa' for first region).  Actual value is the row name.
-   * @param expected
-   * @return
-   * @throws IOException
-   */
-  private static int addToEachStartKey(final int expected) throws IOException {
-    HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
-    HTable meta = new HTable(TEST_UTIL.getConfiguration(),
-        HConstants.META_TABLE_NAME);
-    int rows = 0;
-    Scan scan = new Scan();
-    scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
-    ResultScanner s = meta.getScanner(scan);
-    for (Result r = null; (r = s.next()) != null;) {
-      byte [] b =
-        r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
-      if (b == null || b.length <= 0) break;
-      HRegionInfo hri = Writables.getHRegionInfo(b);
-      // If start key, add 'aaa'.
-      byte [] row = getStartKey(hri);
-      Put p = new Put(row);
-      p.add(getTestFamily(), getTestQualifier(), row);
-      t.put(p);
-      rows++;
-    }
-    s.close();
-    Assert.assertEquals(expected, rows);
-    return rows;
-  }
-
-  private static byte [] getStartKey(final HRegionInfo hri) {
-    return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())?
-        Bytes.toBytes("aaa"): hri.getStartKey();
-  }
-
-  private static byte [] getTestFamily() {
-    return FAMILIES[0];
-  }
-
-  private static byte [] getTestQualifier() {
-    return getTestFamily();
-  }
-  
-  public static void main(String args[]) throws Exception {
-    TestZKBasedCloseRegion.beforeAllTests();
-    
-    TestZKBasedCloseRegion test = new TestZKBasedCloseRegion();
-    test.setup();
-    test.testCloseRegion();
-    
-    TestZKBasedCloseRegion.afterAllTests();
-  }
-}

Modified: hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java (original)
+++ hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java Fri Sep 24 20:48:55 2010
@@ -1,268 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.ProcessRegionClose;
-import org.apache.hadoop.hbase.master.ProcessRegionOpen;
-import org.apache.hadoop.hbase.master.RegionServerOperation;
-import org.apache.hadoop.hbase.master.RegionServerOperationListener;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.Writables;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestZKBasedReopenRegion {
-  private static final Log LOG = LogFactory.getLog(TestZKBasedReopenRegion.class);
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static final String TABLENAME = "master_transitions";
-  private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
-    Bytes.toBytes("b"), Bytes.toBytes("c")};
-
-  @BeforeClass public static void beforeAllTests() throws Exception {
-    Configuration c = TEST_UTIL.getConfiguration();
-    c.setBoolean("dfs.support.append", true);
-    c.setInt("hbase.regionserver.info.port", 0);
-    c.setInt("hbase.master.meta.thread.rescanfrequency", 5*1000);
-    TEST_UTIL.startMiniCluster(2);
-    TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
-    HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
-    int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily());
-    waitUntilAllRegionsAssigned(countOfRegions);
-    addToEachStartKey(countOfRegions);
-  }
-
-  @AfterClass public static void afterAllTests() throws IOException {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Before public void setup() throws IOException {
-    if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) {
-      // Need at least two servers.
-      LOG.info("Started new server=" +
-        TEST_UTIL.getHBaseCluster().startRegionServer());
-      
-    }
-  }
-
-  @Test (timeout=300000) public void testOpenRegion()
-  throws Exception {
-    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-    LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size());
-
-    int rsIdx = 0;
-    HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
-    Collection<HRegion> regions = regionServer.getOnlineRegions();
-    HRegion region = regions.iterator().next();
-    LOG.debug("Asking RS to close region " + region.getRegionNameAsString());
-
-    AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
-    AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
-    RegionServerOperationListener listener = 
-      new ReopenRegionEventListener(region.getRegionNameAsString(), 
-                                    closeEventProcessed,
-                                    reopenEventProcessed);
-    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
-    master.getRegionServerOperationQueue().registerRegionServerOperationListener(listener);
-    HMsg closeRegionMsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, 
-                                   region.getRegionInfo(),
-                                   Bytes.toBytes("Forcing close in test")
-                                  );
-    TEST_UTIL.getHBaseCluster().addMessageToSendRegionServer(rsIdx, closeRegionMsg);
-    
-    synchronized(closeEventProcessed) {
-      closeEventProcessed.wait(3*60*1000);
-    }
-    if(!closeEventProcessed.get()) {
-      throw new Exception("Timed out, close event not called on master.");
-    }
-
-    synchronized(reopenEventProcessed) {
-      reopenEventProcessed.wait(3*60*1000);
-    }
-    if(!reopenEventProcessed.get()) {
-      throw new Exception("Timed out, open event not called on master after region close.");
-    }    
-    
-    LOG.info("Done with test, RS informed master successfully.");
-  }
-  
-  public static class ReopenRegionEventListener implements RegionServerOperationListener {
-    
-    private static final Log LOG = LogFactory.getLog(ReopenRegionEventListener.class);
-    String regionToClose;
-    AtomicBoolean closeEventProcessed;
-    AtomicBoolean reopenEventProcessed;
-
-    public ReopenRegionEventListener(String regionToClose, 
-                                     AtomicBoolean closeEventProcessed,
-                                     AtomicBoolean reopenEventProcessed) {
-      this.regionToClose = regionToClose;
-      this.closeEventProcessed = closeEventProcessed;
-      this.reopenEventProcessed = reopenEventProcessed;
-    }
-
-    @Override
-    public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
-      return true;
-    }
-
-    @Override
-    public boolean process(RegionServerOperation op) throws IOException {
-      return true;
-    }
-
-    @Override
-    public void processed(RegionServerOperation op) {
-      LOG.debug("Master processing object: " + op.getClass().getCanonicalName());
-      if(op instanceof ProcessRegionClose) {
-        ProcessRegionClose regionCloseOp = (ProcessRegionClose)op;
-        String region = regionCloseOp.getRegionInfo().getRegionNameAsString();
-        LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose);
-        if(regionToClose.equals(region)) {
-          closeEventProcessed.set(true);
-        }
-        synchronized(closeEventProcessed) {
-          closeEventProcessed.notifyAll();
-        }
-      }
-      // Wait for open event AFTER we have closed the region
-      if(closeEventProcessed.get()) {
-        if(op instanceof ProcessRegionOpen) {
-          ProcessRegionOpen regionOpenOp = (ProcessRegionOpen)op;
-          String region = regionOpenOp.getRegionInfo().getRegionNameAsString();
-          LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose);
-          if(regionToClose.equals(region)) {
-            reopenEventProcessed.set(true);
-          }
-          synchronized(reopenEventProcessed) {
-            reopenEventProcessed.notifyAll();
-          }
-        }        
-      }
-      
-    }
-    
-  }
-  
-
-  private static void waitUntilAllRegionsAssigned(final int countOfRegions)
-  throws IOException {
-    HTable meta = new HTable(TEST_UTIL.getConfiguration(),
-      HConstants.META_TABLE_NAME);
-    while (true) {
-      int rows = 0;
-      Scan scan = new Scan();
-      scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
-      ResultScanner s = meta.getScanner(scan);
-      for (Result r = null; (r = s.next()) != null;) {
-        byte [] b =
-          r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
-        if (b == null || b.length <= 0) break;
-        rows++;
-      }
-      s.close();
-      // If I get to here and all rows have a Server, then all have been assigned.
-      if (rows == countOfRegions) break;
-      LOG.info("Found=" + rows);
-      Threads.sleep(1000); 
-    }
-  }
-
-  /*
-   * Add to each of the regions in .META. a value.  Key is the startrow of the
-   * region (except its 'aaa' for first region).  Actual value is the row name.
-   * @param expected
-   * @return
-   * @throws IOException
-   */
-  private static int addToEachStartKey(final int expected) throws IOException {
-    HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
-    HTable meta = new HTable(TEST_UTIL.getConfiguration(),
-        HConstants.META_TABLE_NAME);
-    int rows = 0;
-    Scan scan = new Scan();
-    scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
-    ResultScanner s = meta.getScanner(scan);
-    for (Result r = null; (r = s.next()) != null;) {
-      byte [] b =
-        r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
-      if (b == null || b.length <= 0) break;
-      HRegionInfo hri = Writables.getHRegionInfo(b);
-      // If start key, add 'aaa'.
-      byte [] row = getStartKey(hri);
-      Put p = new Put(row);
-      p.add(getTestFamily(), getTestQualifier(), row);
-      t.put(p);
-      rows++;
-    }
-    s.close();
-    Assert.assertEquals(expected, rows);
-    return rows;
-  }
-
-  private static byte [] getStartKey(final HRegionInfo hri) {
-    return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())?
-        Bytes.toBytes("aaa"): hri.getStartKey();
-  }
-
-  private static byte [] getTestFamily() {
-    return FAMILIES[0];
-  }
-
-  private static byte [] getTestQualifier() {
-    return getTestFamily();
-  }
-  
-  public static void main(String args[]) throws Exception {
-    TestZKBasedReopenRegion.beforeAllTests();
-    
-    TestZKBasedReopenRegion test = new TestZKBasedReopenRegion();
-    test.setup();
-    test.testOpenRegion();
-    
-    TestZKBasedReopenRegion.afterAllTests();
-  }
-}

Modified: hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1001066&r1=1001065&r2=1001066&view=diff
==============================================================================
--- hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.89.20100924/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Fri Sep 24 20:48:55 2010
@@ -86,6 +86,7 @@ public class TestHLogSplit {
     INSERT_GARBAGE_ON_FIRST_LINE,
     INSERT_GARBAGE_IN_THE_MIDDLE,
     APPEND_GARBAGE,
+    TRUNCATE,
   }
 
   @BeforeClass
@@ -274,7 +275,8 @@ public class TestHLogSplit {
     }
   }
 
-  @Test
+  // TODO: fix this test (HBASE-2935)
+  //@Test
   public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
     conf.setBoolean(HBASE_SKIP_ERRORS, true);
 
@@ -299,6 +301,36 @@ public class TestHLogSplit {
   }
 
   @Test
+  public void testEOFisIgnored() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+    final String REGION = "region__1";
+    regions.removeAll(regions);
+    regions.add(REGION);
+
+    int entryCount = 10;
+    Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
+    generateHLogs(1, entryCount, -1);
+    corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
+
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
+    Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
+
+    int actualCount = 0;
+    HLog.Reader in = HLog.getReader(fs, splitLog, conf);
+    HLog.Entry entry;
+    while ((entry = in.next()) != null) ++actualCount;
+    assertEquals(entryCount-1, actualCount);
+    
+    // should not have stored the EOF files as corrupt
+    FileStatus[] archivedLogs = fs.listStatus(corruptDir);
+    assertEquals(archivedLogs.length, 0);
+  }
+  
+  @Test
   public void testLogsGetArchivedAfterSplit() throws IOException {
     conf.setBoolean(HBASE_SKIP_ERRORS, false);
 
@@ -314,7 +346,8 @@ public class TestHLogSplit {
 
 
 
-  @Test(expected = IOException.class)
+  // TODO: fix this test (HBASE-2935)
+  //@Test(expected = IOException.class)
   public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
     conf.setBoolean(HBASE_SKIP_ERRORS, false);
     generateHLogs(Integer.MAX_VALUE);
@@ -325,7 +358,8 @@ public class TestHLogSplit {
     HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
   }
 
-  @Test
+  // TODO: fix this test (HBASE-2935)
+  //@Test
   public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
     conf.setBoolean(HBASE_SKIP_ERRORS, false);
     generateHLogs(-1);
@@ -652,6 +686,14 @@ public class TestHLogSplit {
         out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
         closeOrFlush(close, out);
         break;
+        
+      case TRUNCATE:
+        fs.delete(path, false);
+        out = fs.create(path);
+        out.write(corrupted_bytes, 0, fileSize-32);
+        closeOrFlush(close, out);
+        
+        break;
     }
 
 



Mime
View raw message