hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r991397 [9/15] - in /hbase/trunk: ./ bin/ conf/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/avro/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/or...
Date Tue, 31 Aug 2010 23:51:50 GMT
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseException.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseException.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseException.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,42 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Reports a problem with a lease
+ */
+public class LeaseException extends DoNotRetryIOException {
+
+  private static final long serialVersionUID = 8179703995292418650L;
+
+  /** default constructor */
+  public LeaseException() {
+    super();
+  }
+
+  /**
+   * @param message
+   */
+  public LeaseException(String message) {
+    super(message);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseListener.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseListener.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseListener.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LeaseListener.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+
+/**
+ * LeaseListener is an interface meant to be implemented by users of the Leases
+ * class.
+ *
+ * It receives events from the Leases class about the status of its accompanying
+ * lease.  Users of the Leases class can use a LeaseListener subclass to, for
+ * example, clean up resources after a lease has expired.
+ */
+public interface LeaseListener {
+  /** When a lease expires, this method is called. */
+  public void leaseExpired();
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,281 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+
+import java.io.IOException;
+
+/**
+ * Leases
+ *
+ * There are several server classes in HBase that need to track external
+ * clients that occasionally send heartbeats.
+ *
+ * <p>These external clients hold resources in the server class.
+ * Those resources need to be released if the external client fails to send a
+ * heartbeat after some interval of time passes.
+ *
+ * <p>The Leases class is a general reusable class for this kind of pattern.
+ * An instance of the Leases class will create a thread to do its dirty work.
+ * You should close() the instance if you want to clean up the thread properly.
+ *
+ * <p>
+ * NOTE: This class extends Thread rather than Chore because the sleep time
+ * can be interrupted when there is something to do, rather than the Chore
+ * sleep time which is invariant.
+ */
+public class Leases extends Thread {
+  private static final Log LOG = LogFactory.getLog(Leases.class.getName());
+  private final int leasePeriod;
+  private final int leaseCheckFrequency;
+  private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
+  protected final Map<String, Lease> leases = new HashMap<String, Lease>();
+  private volatile boolean stopRequested = false;
+
+  /**
+   * Creates a lease monitor
+   *
+   * @param leasePeriod - length of time (milliseconds) that the lease is valid
+   * @param leaseCheckFrequency - how often the lease should be checked
+   * (milliseconds)
+   */
+  public Leases(final int leasePeriod, final int leaseCheckFrequency) {
+    this.leasePeriod = leasePeriod;
+    this.leaseCheckFrequency = leaseCheckFrequency;
+  }
+
+  /**
+   * @see java.lang.Thread#run()
+   */
+  @Override
+  public void run() {
+    while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) {
+      Lease lease = null;
+      try {
+        lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        continue;
+      } catch (ConcurrentModificationException e) {
+        continue;
+      } catch (Throwable e) {
+        LOG.fatal("Unexpected exception killed leases thread", e);
+        break;
+      }
+      if (lease == null) {
+        continue;
+      }
+      // A lease expired.  Run the expired code before removing from queue
+      // since its presence in queue is used to see if lease exists still.
+      if (lease.getListener() == null) {
+        LOG.error("lease listener is null for lease " + lease.getLeaseName());
+      } else {
+        lease.getListener().leaseExpired();
+      }
+      synchronized (leaseQueue) {
+        leases.remove(lease.getLeaseName());
+      }
+    }
+    close();
+  }
+
+  /**
+   * Shuts down this lease instance when all outstanding leases expire.
+   * Like {@link #close()} but rather than violently end all leases, waits
+   * first on extant leases to finish.  Use this method if the lease holders
+   * could loose data, leak locks, etc.  Presumes client has shutdown
+   * allocation of new leases.
+   */
+  public void closeAfterLeasesExpire() {
+    this.stopRequested = true;
+  }
+
+  /**
+   * Shut down this Leases instance.  All pending leases will be destroyed,
+   * without any cancellation calls.
+   */
+  public void close() {
+    LOG.info(Thread.currentThread().getName() + " closing leases");
+    this.stopRequested = true;
+    synchronized (leaseQueue) {
+      leaseQueue.clear();
+      leases.clear();
+      leaseQueue.notifyAll();
+    }
+    LOG.info(Thread.currentThread().getName() + " closed leases");
+  }
+
+  /**
+   * Obtain a lease
+   *
+   * @param leaseName name of the lease
+   * @param listener listener that will process lease expirations
+   * @throws LeaseStillHeldException
+   */
+  public void createLease(String leaseName, final LeaseListener listener)
+  throws LeaseStillHeldException {
+    if (stopRequested) {
+      return;
+    }
+    Lease lease = new Lease(leaseName, listener,
+        System.currentTimeMillis() + leasePeriod);
+    synchronized (leaseQueue) {
+      if (leases.containsKey(leaseName)) {
+        throw new LeaseStillHeldException(leaseName);
+      }
+      leases.put(leaseName, lease);
+      leaseQueue.add(lease);
+    }
+  }
+
+  /**
+   * Thrown if we are asked create a lease but lease on passed name already
+   * exists.
+   */
+  @SuppressWarnings("serial")
+  public static class LeaseStillHeldException extends IOException {
+    private final String leaseName;
+
+    /**
+     * @param name
+     */
+    public LeaseStillHeldException(final String name) {
+      this.leaseName = name;
+    }
+
+    /** @return name of lease */
+    public String getName() {
+      return this.leaseName;
+    }
+  }
+
+  /**
+   * Renew a lease
+   *
+   * @param leaseName name of lease
+   * @throws LeaseException
+   */
+  public void renewLease(final String leaseName) throws LeaseException {
+    synchronized (leaseQueue) {
+      Lease lease = leases.get(leaseName);
+      // We need to check to see if the remove is successful as the poll in the run()
+      // method could have completed between the get and the remove which will result
+      // in a corrupt leaseQueue.
+      if (lease == null || !leaseQueue.remove(lease)) {
+        throw new LeaseException("lease '" + leaseName +
+                "' does not exist or has already expired");
+      }
+      lease.setExpirationTime(System.currentTimeMillis() + leasePeriod);
+      leaseQueue.add(lease);
+    }
+  }
+
+  /**
+   * Client explicitly cancels a lease.
+   *
+   * @param leaseName name of lease
+   * @throws LeaseException
+   */
+  public void cancelLease(final String leaseName) throws LeaseException {
+    synchronized (leaseQueue) {
+      Lease lease = leases.remove(leaseName);
+      if (lease == null) {
+        throw new LeaseException("lease '" + leaseName + "' does not exist");
+      }
+      leaseQueue.remove(lease);
+    }
+  }
+
+  /** This class tracks a single Lease. */
+  private static class Lease implements Delayed {
+    private final String leaseName;
+    private final LeaseListener listener;
+    private long expirationTime;
+
+    Lease(final String leaseName, LeaseListener listener, long expirationTime) {
+      this.leaseName = leaseName;
+      this.listener = listener;
+      this.expirationTime = expirationTime;
+    }
+
+    /** @return the lease name */
+    public String getLeaseName() {
+      return leaseName;
+    }
+
+    /** @return listener */
+    public LeaseListener getListener() {
+      return this.listener;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      return this.hashCode() == ((Lease) obj).hashCode();
+    }
+
+    @Override
+    public int hashCode() {
+      return this.leaseName.hashCode();
+    }
+
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(this.expirationTime - System.currentTimeMillis(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    public int compareTo(Delayed o) {
+      long delta = this.getDelay(TimeUnit.MILLISECONDS) -
+        o.getDelay(TimeUnit.MILLISECONDS);
+
+      return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
+    }
+
+    /** @param expirationTime the expirationTime to set */
+    public void setExpirationTime(long expirationTime) {
+      this.expirationTime = expirationTime;
+    }
+
+    /**
+     * Get the expiration time for that lease
+     * @return expiration time
+     */
+    public long getExpirationTime() {
+      return this.expirationTime;
+    }
+
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Tue Aug 31 23:51:44 2010
@@ -21,9 +21,15 @@ package org.apache.hadoop.hbase.regionse
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
@@ -37,26 +43,31 @@ import java.util.concurrent.locks.Reentr
  * can be interrupted when there is something to do, rather than the Chore
  * sleep time which is invariant.
  */
-class LogRoller extends Thread implements LogRollListener {
+class LogRoller extends Thread implements WALObserver {
   static final Log LOG = LogFactory.getLog(LogRoller.class);
   private final ReentrantLock rollLock = new ReentrantLock();
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
-  private final HRegionServer server;
+  private final Server server;
+  private final RegionServerServices services;
   private volatile long lastrolltime = System.currentTimeMillis();
   // Period to roll log.
   private final long rollperiod;
+  private final int threadWakeFrequency;
 
   /** @param server */
-  public LogRoller(final HRegionServer server) {
+  public LogRoller(final Server server, final RegionServerServices services) {
     super();
     this.server = server;
-    this.rollperiod =
-      this.server.conf.getLong("hbase.regionserver.logroll.period", 3600000);
+    this.services = services;
+    this.rollperiod = this.server.getConfiguration().
+      getLong("hbase.regionserver.logroll.period", 3600000);
+    this.threadWakeFrequency = this.server.getConfiguration().
+      getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
   }
 
   @Override
   public void run() {
-    while (!server.isStopRequested()) {
+    while (!server.isStopped()) {
       long now = System.currentTimeMillis();
       boolean periodic = false;
       if (!rollLog.get()) {
@@ -64,7 +75,7 @@ class LogRoller extends Thread implement
         if (!periodic) {
           synchronized (rollLog) {
             try {
-              rollLog.wait(server.threadWakeFrequency);
+              rollLog.wait(this.threadWakeFrequency);
             } catch (InterruptedException e) {
               // Fall through
             }
@@ -79,27 +90,21 @@ class LogRoller extends Thread implement
       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
       try {
         this.lastrolltime = now;
-        byte [][] regionsToFlush = server.getLog().rollWriter();
+        // This is array of actual region names.
+        byte [][] regionsToFlush = this.services.getWAL().rollWriter();
         if (regionsToFlush != null) {
           for (byte [] r: regionsToFlush) scheduleFlush(r);
         }
       } catch (FailedLogCloseException e) {
-        LOG.fatal("Forcing server shutdown", e);
-        server.checkFileSystem();
         server.abort("Failed log close in log roller", e);
       } catch (java.net.ConnectException e) {
-        LOG.fatal("Forcing server shutdown", e);
-        server.checkFileSystem();
-        server.abort("Failed connect in log roller", e);
+        server.abort("Failed log close in log roller", e);
       } catch (IOException ex) {
-        LOG.fatal("Log rolling failed with ioe: ",
-          RemoteExceptionHandler.checkIOException(ex));
-        server.checkFileSystem();
         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
-        server.abort("IOE in log roller", ex);
+        server.abort("IOE in log roller",
+          RemoteExceptionHandler.checkIOException(ex));
       } catch (Exception ex) {
         LOG.error("Log rolling failed", ex);
-        server.checkFileSystem();
         server.abort("Log rolling failed", ex);
       } finally {
         rollLog.set(false);
@@ -109,14 +114,17 @@ class LogRoller extends Thread implement
     LOG.info("LogRoller exiting.");
   }
 
+  /**
+   * @param region Encoded name of region to flush.
+   */
   private void scheduleFlush(final byte [] region) {
     boolean scheduled = false;
-    HRegion r = this.server.getOnlineRegion(region);
+    HRegion r = this.services.getFromOnlineRegions(Bytes.toString(region));
     FlushRequester requester = null;
     if (r != null) {
-      requester = this.server.getFlushRequester();
+      requester = this.services.getFlushRequester();
       if (requester != null) {
-        requester.request(r);
+        requester.requestFlush(r);
         scheduled = true;
       }
     }
@@ -145,4 +153,15 @@ class LogRoller extends Thread implement
       rollLock.unlock();
     }
   }
+
+  @Override
+  public void logRolled(Path newFile) {
+    // Not interested
+  }
+
+  @Override
+  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+      WALEdit logEdit) {
+    // Not interested.
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue Aug 31 23:51:44 2010
@@ -136,7 +136,7 @@ class MemStoreFlusher extends Thread imp
 
   @Override
   public void run() {
-    while (!this.server.isStopRequested()) {
+    while (!this.server.isStopped()) {
       FlushQueueEntry fqe = null;
       try {
         fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
@@ -164,7 +164,7 @@ class MemStoreFlusher extends Thread imp
     LOG.info(getName() + " exiting");
   }
 
-  public void request(HRegion r) {
+  public void requestFlush(HRegion r) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
@@ -212,7 +212,7 @@ class MemStoreFlusher extends Thread imp
           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
         }
-        this.server.compactSplitThread.compactionRequested(region, getName());
+        this.server.compactSplitThread.requestCompaction(region, getName());
         // Put back on the queue.  Have it come back out of the queue
         // after a delay of this.blockingWaitTime / 100 ms.
         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
@@ -247,7 +247,7 @@ class MemStoreFlusher extends Thread imp
     }
     try {
       if (region.flushcache()) {
-        server.compactSplitThread.compactionRequested(region, getName());
+        server.compactSplitThread.requestCompaction(region, getName());
       }
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical
@@ -325,7 +325,7 @@ class MemStoreFlusher extends Thread imp
       regionsToCompact.add(biggestMemStoreRegion);
     }
     for (HRegion region : regionsToCompact) {
-      server.compactSplitThread.compactionRequested(region, getName());
+      server.compactSplitThread.requestCompaction(region, getName());
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java Tue Aug 31 23:51:44 2010
@@ -19,23 +19,32 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.hadoop.hbase.HRegionInfo;
-
 /**
- * Add and remove online regions.
+ * Interface to Map of online regions.  In the  Map, the key is the region's
+ * encoded name and the value is an {@link HRegion} instance.
  */
 interface OnlineRegions {
   /**
    * Add to online regions.
    * @param r
    */
-  void addToOnlineRegions(final HRegion r);
+  public void addToOnlineRegions(final HRegion r);
 
   /**
    * This method removes HRegion corresponding to hri from the Map of onlineRegions.
    *
-   * @param hri the HRegionInfo corresponding to the HRegion to-be-removed.
-   * @return the removed HRegion, or null if the HRegion was not in onlineRegions.
+   * @param encodedRegionName
+   * @return True if we removed a region from online list.
+   */
+  public boolean removeFromOnlineRegions(String encodedRegionName);
+
+  /**
+   * Return {@link HRegion} instance.
+   * Only works if caller is in same context, in same JVM. HRegion is not
+   * serializable.
+   * @param encodedRegionName
+   * @return HRegion for the passed encoded <code>encodedRegionName</code> or
+   * null if named region is not member of the online regions.
    */
-  HRegion removeFromOnlineRegions(HRegionInfo hri);
-}
+  public HRegion getFromOnlineRegions(String encodedRegionName);
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Services provided by {@link HRegionServer}
+ */
+public interface RegionServerServices extends OnlineRegions {
+  public HLog getWAL();
+
+  /**
+   * @return Implementation of {@link CompactionRequestor} or null.
+   */
+  public CompactionRequestor getCompactionRequester();
+  
+  /**
+   * @return Implementation of {@link FlushRequester} or null.
+   */
+  public FlushRequester getFlushRequester();
+
+  /**
+   * Return data structure that has Server address and startcode.
+   * @return The HServerInfo for this RegionServer.
+   */
+  public HServerInfo getServerInfo();
+
+  /**
+   * Tasks to perform after region open to complete deploy of region on
+   * regionserver
+   * @param r Region to open.
+   * @param ct Instance of {@link CatalogTracker}
+   * @param daughter True if this is daughter of a split
+   * @throws KeeperException
+   * @throws IOException
+   */
+  public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct,
+      final boolean daughter)
+  throws KeeperException, IOException;
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ShutdownHook.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ShutdownHook.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ShutdownHook.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ShutdownHook.java Tue Aug 31 23:51:44 2010
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.util.Threads;
 
 /**
@@ -99,7 +100,7 @@ class ShutdownHook {
       LOG.info("Shutdown hook starting; " + RUN_SHUTDOWN_HOOK + "=" + b +
         "; fsShutdownHook=" + this.fsShutdownHook);
       if (b) {
-        this.stop.stop();
+        this.stop.stop("Shutdown hook");
         Threads.shutdown(this.threadToJoin);
         if (this.fsShutdownHook != null) {
           LOG.info("Starting fs shutdown hook thread.");
@@ -198,8 +199,14 @@ class ShutdownHook {
   // Stoppable with nothing to stop.  Used below in main testing.
   static class DoNothingStoppable implements Stoppable {
     @Override
-    public void stop() {
-      // Nothing to do.
+    public boolean isStopped() {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
+    @Override
+    public void stop(String why) {
+      // TODO Auto-generated method stub
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Tue Aug 31 23:51:44 2010
@@ -30,16 +30,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.PairOfSameType;
-import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.util.Progressable;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Executes region split as a "transaction".  Call {@link #prepare()} to setup
@@ -112,6 +112,8 @@ class SplitTransaction {
 
   /**
    * Constructor
+   * @param services So we can online new servces.  If null, we'll skip onlining
+   * (Useful testing).
    * @param c Configuration to use running split
    * @param r Region to split
    * @param splitrow Row to split around
@@ -176,35 +178,23 @@ class SplitTransaction {
 
   /**
    * Run the transaction.
-   * @param or Object that can online/offline parent region.
+   * @param server Hosting server instance.
+   * @param services Used to online/offline regions.
    * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
    * @return Regions created
    * @see #rollback(OnlineRegions)
    */
-  public PairOfSameType<HRegion> execute(final OnlineRegions or) throws IOException {
-    return execute(or, or != null);
-  }
-
-  /**
-   * Run the transaction.
-   * @param or Object that can online/offline parent region.  Can be null (Tests
-   * will pass null).
-   * @param If <code>true</code>, update meta (set to false when testing).
-   * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
-   * @return Regions created
-   * @see #rollback(OnlineRegions)
-   */
-  PairOfSameType<HRegion> execute(final OnlineRegions or, final boolean updateMeta)
+  PairOfSameType<HRegion> execute(final Server server,
+      final RegionServerServices services)
   throws IOException {
     LOG.info("Starting split of region " + this.parent);
     if (!this.parent.lock.writeLock().isHeldByCurrentThread()) {
       throw new SplitAndCloseWriteLockNotHeld();
     }
 
-    // We'll need one of these later but get it now because if we fail there
-    // is nothing to undo.
-    HTable t = null;
-    if (updateMeta) t = getTable(this.parent.getConf());
+    // If true, no cluster to write meta edits into.
+    boolean testing =
+      server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
 
     createSplitDir(this.parent.getFilesystem(), this.splitdir);
     this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
@@ -212,7 +202,9 @@ class SplitTransaction {
     List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
     this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
 
-    if (or != null) or.removeFromOnlineRegions(this.parent.getRegionInfo());
+    if (!testing) {
+      services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
+    }
     this.journal.add(JournalEntry.OFFLINED_PARENT);
 
     splitStoreFiles(this.splitdir, hstoreFilesToSplit);
@@ -225,52 +217,39 @@ class SplitTransaction {
     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
     // add entry to journal BEFORE rather than AFTER the change.
     this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
-    HRegion a = createDaughterRegion(this.hri_a);
+    HRegion a = createDaughterRegion(this.hri_a, this.parent.flushRequester);
 
     // Ditto
     this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
-    HRegion b = createDaughterRegion(this.hri_b);
-
-    Put editParentPut = createOfflineParentPut();
-    if (t != null) t.put(editParentPut);
+    HRegion b = createDaughterRegion(this.hri_b, this.parent.flushRequester);
 
-    // The is the point of no return.  We are committed to the split now.  Up to
-    // a failure editing parent in meta or a crash of the hosting regionserver,
-    // we could rollback (or, if crash, we could cleanup on redeploy) but now
-    // meta has been changed, we can only go forward.  If the below last steps
-    // do not complete, repair has to be done by another agent.  For example,
-    // basescanner, at least up till master rewrite, would add daughter rows if
-    // missing from meta.  It could do this because the parent edit includes the
-    // daughter specs.  In Bigtable paper, they have another mechanism where
-    // some feedback to the master somehow flags it that split is incomplete and
-    // needs fixup.  Whatever the mechanism, its a TODO that we have some fixup.
-    
-    // I looked at writing the put of the parent edit above out to the WAL log
-    // before changing meta with the notion that should we fail, then on replay
-    // the offlining of the parent and addition of daughters up into meta could
-    // be reinserted.  The edits would have to be 'special' and given how our
-    // splits work, splitting by region, I think the replay would have to happen
-    // inside in the split code -- as soon as it saw one of these special edits,
-    // rather than write the edit out a file for the .META. region to replay or
-    // somehow, write it out to this regions edits file for it to handle on
-    // redeploy -- this'd be whacky, we'd be telling meta about a split during
-    // the deploy of the parent -- instead we'd have to play the edit inside
-    // in the split code somehow; this would involve a stop-the-splitting till
-    // meta had been edited which might hold up splitting a good while.
-
-    // Finish up the meta edits.  If these fail, another agent needs to do fixup
-    HRegionInfo hri = this.hri_a;
-    try {
-      if (t != null) t.put(createDaughterPut(hri));
-      hri = this.hri_b;
-      if (t != null) t.put(createDaughterPut(hri));
-    } catch (IOException e) {
-      // Don't let this out or we'll run rollback.
-      LOG.warn("Failed adding daughter " + hri.toString());
-    }
-    // This should not fail because the HTable instance we are using is not
-    // running a buffer -- its immediately flushing its puts.
-    if (t != null) t.close();
+    // Edit parent in meta
+    if (!testing) {
+      MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
+        this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
+    }
+
+    // The is the point of no return.  We are committed to the split now.  We
+    // have still the daughter regions to open but meta has been changed.
+    // If we fail from here on out, we can not rollback so, we'll just abort.
+    // The meta has been changed though so there will need to be a fixup run
+    // during processing of the crashed server by master (TODO: Verify this in place).
+
+    // TODO: Could we be smarter about the sequence in which we do these steps?
+
+    if (!testing) {
+      // Open daughters in parallel.
+      DaughterOpener aOpener = new DaughterOpener(server, services, a);
+      DaughterOpener bOpener = new DaughterOpener(server, services, b);
+      aOpener.start();
+      bOpener.start();
+      try {
+        aOpener.join();
+        bOpener.join();
+      } catch (InterruptedException e) {
+        server.abort("Exception running daughter opens", e);
+      }
+    }
 
     // Unlock if successful split.
     this.parent.lock.writeLock().unlock();
@@ -281,6 +260,70 @@ class SplitTransaction {
     return new PairOfSameType<HRegion>(a, b);
   }
 
+  class DaughterOpener extends Thread {
+    private final RegionServerServices services;
+    private final Server server;
+    private final HRegion r;
+
+    DaughterOpener(final Server s, final RegionServerServices services,
+        final HRegion r) {
+      super(s.getServerName() + "-daughterOpener=" + r.getRegionInfo().getEncodedName());
+      setDaemon(true);
+      this.services = services;
+      this.server = s;
+      this.r = r;
+    }
+
+    @Override
+    public void run() {
+      try {
+        openDaughterRegion(this.server, this.services, r);
+      } catch (Throwable t) {
+        this.server.abort("Failed open of daughter " +
+          this.r.getRegionInfo().getRegionNameAsString(), t);
+      }
+    }
+  }
+
+  /**
+   * Open daughter regions, add them to online list and update meta.
+   * @param server
+   * @param services
+   * @param daughter
+   * @throws IOException
+   * @throws KeeperException
+   */
+  void openDaughterRegion(final Server server,
+      final RegionServerServices services, final HRegion daughter)
+  throws IOException, KeeperException {
+    HRegionInfo hri = daughter.getRegionInfo();
+    LoggingProgressable reporter =
+      new LoggingProgressable(hri, server.getConfiguration());
+    HRegion r = daughter.openHRegion(reporter);
+    services.postOpenDeployTasks(r, server.getCatalogTracker(), true);
+  }
+
+  static class LoggingProgressable implements Progressable {
+    private final HRegionInfo hri;
+    private long lastLog = -1;
+    private final long interval;
+
+    LoggingProgressable(final HRegionInfo hri, final Configuration c) {
+      this.hri = hri;
+      this.interval = c.getLong("hbase.regionserver.split.daughter.open.log.interval",
+        10000);
+    }
+
+    @Override
+    public void progress() {
+      long now = System.currentTimeMillis();
+      if (now - lastLog > this.interval) {
+        LOG.info("Opening " + this.hri.getRegionNameAsString());
+        this.lastLog = now;
+      }
+    }
+  }
+
   private static Path getSplitDir(final HRegion r) {
     return new Path(r.getRegionDir(), SPLITDIR);
   }
@@ -348,12 +391,14 @@ class SplitTransaction {
   }
 
   /**
-   * @param hri
+   * @param hri Spec. for daughter region to open.
+   * @param flusher Flusher this region should use.
    * @return Created daughter HRegion.
    * @throws IOException
    * @see #cleanupDaughterRegion(FileSystem, Path, HRegionInfo)
    */
-  HRegion createDaughterRegion(final HRegionInfo hri)
+  HRegion createDaughterRegion(final HRegionInfo hri,
+      final FlushRequester flusher)
   throws IOException {
     // Package private so unit tests have access.
     FileSystem fs = this.parent.getFilesystem();
@@ -361,7 +406,7 @@ class SplitTransaction {
       this.splitdir, hri);
     HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
       this.parent.getLog(), fs, this.parent.getConf(),
-      hri, null);
+      hri, flusher);
     HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
     return r;
   }
@@ -389,56 +434,6 @@ class SplitTransaction {
     return new Path(splitdir, hri.getEncodedName());
   }
 
-  /*
-   * @param r Parent region we want to edit.
-   * @return An HTable instance against the meta table that holds passed
-   * <code>r</code>; it has autoFlush enabled so we immediately send puts (No
-   * buffering enabled).
-   * @throws IOException
-   */
-  private HTable getTable(final Configuration conf) throws IOException {
-    // When a region is split, the META table needs to updated if we're
-    // splitting a 'normal' region, and the ROOT table needs to be
-    // updated if we are splitting a META region.
-    HTable t = null;
-    if (this.parent.getRegionInfo().isMetaTable()) {
-      t = new HTable(conf, HConstants.ROOT_TABLE_NAME);
-    } else {
-      t = new HTable(conf, HConstants.META_TABLE_NAME);
-    }
-    // Flush puts as we send them -- no buffering.
-    t.setAutoFlush(true);
-    return t;
-  }
-
-
-  private Put createOfflineParentPut() throws IOException  {
-    HRegionInfo editedParentRegionInfo =
-      new HRegionInfo(this.parent.getRegionInfo());
-    editedParentRegionInfo.setOffline(true);
-    editedParentRegionInfo.setSplit(true);
-    Put put = new Put(editedParentRegionInfo.getRegionName());
-    put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
-      Writables.getBytes(editedParentRegionInfo));
-    put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
-        HConstants.EMPTY_BYTE_ARRAY);
-    put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
-        HConstants.EMPTY_BYTE_ARRAY);
-    put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITA_QUALIFIER,
-      Writables.getBytes(this.hri_a));
-    put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER,
-      Writables.getBytes(this.hri_b));
-    return put;
-  }
-
-  private Put createDaughterPut(final HRegionInfo daughter)
-  throws IOException {
-    Put p = new Put(daughter.getRegionName());
-    p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
-      Writables.getBytes(daughter));
-    return p;
-  }
-
   /**
    * @param or Object that can online/offline parent region.  Can be passed null
    * by unit tests.
@@ -537,4 +532,4 @@ class SplitTransaction {
     cleanupSplitDir(r.getFilesystem(), splitdir);
     LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Aug 31 23:51:44 2010
@@ -291,9 +291,6 @@ public class StoreFile {
    * @return This files maximum edit sequence id.
    */
   public long getMaxSequenceId() {
-    if (this.sequenceid == -1) {
-      throw new IllegalAccessError("Has not been initialized");
-    }
     return this.sequenceid;
   }
 
@@ -370,11 +367,9 @@ public class StoreFile {
    * @see #closeReader()
    */
   private Reader open() throws IOException {
-
     if (this.reader != null) {
       throw new IllegalAccessError("Already open");
     }
-
     if (isReference()) {
       this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
           getBlockCache(), this.reference);
@@ -382,7 +377,6 @@ public class StoreFile {
       this.reader = new Reader(this.fs, this.path, getBlockCache(),
           this.inMemory);
     }
-
     // Load up indices and fileinfo.
     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
     // Read in our metadata.
@@ -409,6 +403,10 @@ public class StoreFile {
       } else {
         this.majorCompaction.set(mc);
       }
+    } else {
+      // Presume it is not major compacted if it doesn't explicity say so
+      // HFileOutputFormat explicitly sets the major compacted key.
+      this.majorCompaction = new AtomicBoolean(false);
     }
 
     if (this.bloomType != BloomType.NONE) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Tue Aug 31 23:51:44 2010
@@ -59,5 +59,4 @@ interface StoreFlusher {
    * @throws IOException
    */
   boolean commit() throws IOException;
-
 }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * Handles closing of the root region on a region server.
+ */
+public class CloseMetaHandler extends CloseRegionHandler {
+  // Called when master tells us shutdown a region via close rpc
+  public CloseMetaHandler(final Server server,
+      final RegionServerServices rsServices, final HRegionInfo regionInfo) {
+    this(server, rsServices, regionInfo, false, true);
+  }
+
+  // Called when regionserver determines its to go down; not master orchestrated
+  public CloseMetaHandler(final Server server,
+      final RegionServerServices rsServices,
+      final HRegionInfo regionInfo,
+      final boolean abort, final boolean zk) {
+    super(server, rsServices, regionInfo, abort, zk, EventType.M2RS_CLOSE_META);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,187 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles closing of a region on a region server.
+ */
+public class CloseRegionHandler extends EventHandler {
+  // NOTE on priorities shutting down.  There are none for close. There are some
+  // for open.  I think that is right.  On shutdown, we want the meta to close
+  // before root and both to close after the user regions have closed.  What
+  // about the case where master tells us to shutdown a catalog region and we
+  // have a running queue of user regions to close?
+  private static final Log LOG = LogFactory.getLog(CloseRegionHandler.class);
+
+  private final int FAILED = -1;
+
+  private final RegionServerServices rsServices;
+
+  private final HRegionInfo regionInfo;
+
+  // If true, the hosting server is aborting.  Region close process is different
+  // when we are aborting.
+  private final boolean abort;
+
+  // Update zk on closing transitions. Usually true.  Its false if cluster
+  // is going down.  In this case, its the rs that initiates the region
+  // close -- not the master process so state up in zk will unlikely be
+  // CLOSING.
+  private final boolean zk;
+
+  // This is executed after receiving an CLOSE RPC from the master.
+  public CloseRegionHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo) {
+    this(server, rsServices, regionInfo, false, true);
+  }
+
+  /**
+   * This method used internally by the RegionServer to close out regions.
+   * @param server
+   * @param rsServices
+   * @param regionInfo
+   * @param abort If the regionserver is aborting.
+   * @param zk If the close should be noted out in zookeeper.
+   */
+  public CloseRegionHandler(final Server server,
+      final RegionServerServices rsServices,
+      final HRegionInfo regionInfo, final boolean abort, final boolean zk) {
+    this(server, rsServices,  regionInfo, abort, zk, EventType.M2RS_CLOSE_REGION);
+  }
+
+  protected CloseRegionHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo,
+      boolean abort, final boolean zk, EventType eventType) {
+    super(server, eventType);
+    this.server = server;
+    this.rsServices = rsServices;
+    this.regionInfo = regionInfo;
+    this.abort = abort;
+    this.zk = zk;
+  }
+
+  public HRegionInfo getRegionInfo() {
+    return regionInfo;
+  }
+
+  @Override
+  public void process() {
+    String name = regionInfo.getRegionNameAsString();
+    LOG.debug("Processing close of " + name);
+    String encodedRegionName = regionInfo.getEncodedName();
+    // Check that this region is being served here
+    HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName);
+    if (region == null) {
+      LOG.warn("Received CLOSE for region " + name + " but currently not serving");
+      return;
+    }
+
+    int expectedVersion = FAILED;
+    if (this.zk) {
+      expectedVersion = setClosingState();
+      if (expectedVersion == FAILED) return;
+    }
+
+    // Close the region
+    try {
+      // TODO: If we need to keep updating CLOSING stamp to prevent against
+      //       a timeout if this is long-running, need to spin up a thread?
+      this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName());
+      region.close(abort);
+    } catch (IOException e) {
+      LOG.error("IOException closing region for " + regionInfo);
+      if (this.zk) deleteClosingState();
+    }
+
+    if (this.zk) setClosedState(expectedVersion, region);
+
+    // Done!  Successful region open
+    LOG.debug("Closed region " + region.getRegionNameAsString());
+  }
+
+  /**
+   * Transition ZK node to CLOSED
+   * @param expectedVersion
+   */
+  private void setClosedState(final int expectedVersion, final HRegion region) {
+    try {
+      if (ZKAssign.transitionNodeClosed(server.getZooKeeper(), regionInfo,
+          server.getServerName(), expectedVersion) == FAILED) {
+        LOG.warn("Completed the CLOSE of a region but when transitioning from " +
+            " CLOSING to CLOSED got a version mismatch, someone else clashed " +
+            "so now unassigning");
+        region.close();
+        return;
+      }
+    } catch (NullPointerException e) {
+      // I've seen NPE when table was deleted while close was running in unit tests.
+      LOG.warn("NPE during close -- catching and continuing...", e);
+    } catch (KeeperException e) {
+      LOG.error("Failed transitioning node from CLOSING to CLOSED", e);
+      return;
+    } catch (IOException e) {
+      LOG.error("Failed to close region after failing to transition", e);
+      return;
+    }
+  }
+
+  /**
+   * @return True if succeeded, false otherwise.
+   */
+  private void deleteClosingState() {
+    try {
+      ZKAssign.deleteClosingNode(server.getZooKeeper(),
+          this.regionInfo.getEncodedName()); 
+    } catch (KeeperException e1) {
+      LOG.error("Error deleting CLOSING node");
+    }
+  }
+
+  /**
+   * Create ZK node in CLOSING state.
+   * @return The expectedVersion.  If -1, we failed setting CLOSING.
+   */
+  private int setClosingState() {
+    int expectedVersion = FAILED;
+    try {
+      if ((expectedVersion = ZKAssign.createNodeClosing(
+          server.getZooKeeper(), regionInfo, server.getServerName())) == FAILED) {
+        LOG.warn("Error creating node in CLOSING state, aborting close of "
+            + regionInfo.getRegionNameAsString());
+      }
+    } catch (KeeperException e) {
+      LOG.warn("Error creating node in CLOSING state, aborting close of "
+          + regionInfo.getRegionNameAsString());
+    }
+    return expectedVersion;
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * Handles closing of the root region on a region server.
+ */
+public class CloseRootHandler extends CloseRegionHandler {
+  // This is executed after receiving an CLOSE RPC from the master for root.
+  public CloseRootHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo) {
+    this(server, rsServices, regionInfo, false, true);
+  }
+
+  // This is called directly by the regionserver when its determined its
+  // shutting down.
+  public CloseRootHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo,
+      final boolean abort, final boolean zk) {
+    super(server, rsServices, regionInfo, abort, zk, EventType.M2RS_CLOSE_ROOT);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * Handles opening of a meta region on a region server.
+ * <p>
+ * This is executed after receiving an OPEN RPC from the master for meta.
+ */
+public class OpenMetaHandler extends OpenRegionHandler {
+  public OpenMetaHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo) {
+    super(server,rsServices,  regionInfo, EventType.M2RS_OPEN_META);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,194 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.util.Progressable;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ * Handles opening of a region on a region server.
+ * <p>
+ * This is executed after receiving an OPEN RPC from the master or client.
+ */
+public class OpenRegionHandler extends EventHandler {
+  private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
+
+  private final RegionServerServices rsServices;
+
+  private final HRegionInfo regionInfo;
+
+  public OpenRegionHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo) {
+    this(server, rsServices, regionInfo, EventType.M2RS_OPEN_REGION);
+  }
+
+  protected OpenRegionHandler(final Server server,
+      final RegionServerServices rsServices, final HRegionInfo regionInfo,
+      EventType eventType) {
+    super(server, eventType);
+    this.rsServices = rsServices;
+    this.regionInfo = regionInfo;
+  }
+
+  public HRegionInfo getRegionInfo() {
+    return regionInfo;
+  }
+
+  @Override
+  public void process() throws IOException {
+    LOG.debug("Processing open of " + regionInfo.getRegionNameAsString());
+    final String encodedName = regionInfo.getEncodedName();
+
+    // TODO: Previously we would check for root region availability (but only that it
+    // was initially available, does not check if it later went away)
+    // Do we need to wait on both root and meta to be available to open a region
+    // now since we edit meta?
+
+    // Check that this region is not already online
+    HRegion region = this.rsServices.getFromOnlineRegions(encodedName);
+    if (region != null) {
+      LOG.warn("Attempting open of " + regionInfo.getRegionNameAsString() +
+        " but it's already online on this server");
+      return;
+    }
+
+    int openingVersion = transitionZookeeperOfflineToOpening(encodedName);
+    if (openingVersion == -1) return;
+
+    // Open the region
+    final AtomicInteger openingInteger = new AtomicInteger(openingVersion);
+    try {
+      // Instantiate the region.  This also periodically updates OPENING.
+      region = HRegion.openHRegion(regionInfo, this.rsServices.getWAL(),
+          server.getConfiguration(), this.rsServices.getFlushRequester(),
+          new Progressable() {
+            public void progress() {
+              try {
+                int vsn = ZKAssign.retransitionNodeOpening(
+                    server.getZooKeeper(), regionInfo, server.getServerName(),
+                    openingInteger.get());
+                if (vsn == -1) {
+                  throw KeeperException.create(Code.BADVERSION);
+                }
+                openingInteger.set(vsn);
+              } catch (KeeperException e) {
+                server.abort("ZK exception refreshing OPENING node", e);
+              }
+            }
+      });
+    } catch (IOException e) {
+      LOG.error("IOException instantiating region for " + regionInfo +
+        "; resetting state of transition node from OPENING to OFFLINE");
+      try {
+        // TODO: We should rely on the master timing out OPENING instead of this
+        // TODO: What if this was a split open?  The RS made the OFFLINE
+        // znode, not the master.
+        ZKAssign.forceNodeOffline(server.getZooKeeper(), regionInfo,
+            server.getServerName());
+      } catch (KeeperException e1) {
+        LOG.error("Error forcing node back to OFFLINE from OPENING");
+        return;
+      }
+      return;
+    }
+
+    // Re-transition node to OPENING again to verify no one has stomped on us
+    openingVersion = openingInteger.get();
+    try {
+      if((openingVersion = ZKAssign.retransitionNodeOpening(
+          server.getZooKeeper(), regionInfo, server.getServerName(),
+          openingVersion)) == -1) {
+        LOG.warn("Completed the OPEN of a region but when transitioning from " +
+            " OPENING to OPENING got a version mismatch, someone else clashed " +
+            "so now unassigning");
+        region.close();
+        return;
+      }
+    } catch (KeeperException e) {
+      LOG.error("Failed transitioning node from OPENING to OPENED", e);
+      return;
+    } catch (IOException e) {
+      LOG.error("Failed to close region after failing to transition", e);
+      return;
+    }
+
+    // Update ZK, ROOT or META
+    try {
+      this.rsServices.postOpenDeployTasks(region,
+        this.server.getCatalogTracker(), false);
+    } catch (IOException e) {
+      // TODO: rollback the open?
+      LOG.error("Error updating region location in catalog table", e);
+    } catch (KeeperException e) {
+      // TODO: rollback the open?
+      LOG.error("ZK Error updating region location in catalog table", e);
+    }
+
+    // Finally, Transition ZK node to OPENED
+    try {
+      if(ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo,
+          server.getServerName(), openingVersion) == -1) {
+        LOG.warn("Completed the OPEN of a region but when transitioning from " +
+            " OPENING to OPENED got a version mismatch, someone else clashed " +
+            "so now unassigning");
+        region.close();
+        return;
+      }
+    } catch (KeeperException e) {
+      LOG.error("Failed transitioning node from OPENING to OPENED", e);
+      return;
+    } catch (IOException e) {
+      LOG.error("Failed to close region after failing to transition", e);
+      return;
+    }
+
+    // Done!  Successful region open
+    LOG.debug("Opened " + region.getRegionNameAsString());
+  }
+
+  int transitionZookeeperOfflineToOpening(final String encodedName) {
+    // Transition ZK node from OFFLINE to OPENING
+    // TODO: should also handle transition from CLOSED?
+    int openingVersion = -1;
+    try {
+      if ((openingVersion = ZKAssign.transitionNodeOpening(server.getZooKeeper(),
+          regionInfo, server.getServerName())) == -1) {
+        LOG.warn("Error transitioning node from OFFLINE to OPENING, " +
+            "aborting open");
+      }
+    } catch (KeeperException e) {
+      LOG.error("Error transitioning node from OFFLINE to OPENING for region " +
+        encodedName, e);
+    }
+    return openingVersion;
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * Handles opening of the root region on a region server.
+ * <p>
+ * This is executed after receiving an OPEN RPC from the master for root.
+ */
+public class OpenRootHandler extends OpenRegionHandler {
+  public OpenRootHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo) {
+    super(server, rsServices, regionInfo, EventType.M2RS_OPEN_ROOT);
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Aug 31 23:51:44 2010
@@ -47,6 +47,7 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -78,8 +79,9 @@ import org.apache.hadoop.hbase.util.Clas
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
 
-import com.google.common.util.concurrent.NamingThreadFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
@@ -136,15 +138,15 @@ public class HLog implements Syncable {
   private final FileSystem fs;
   private final Path dir;
   private final Configuration conf;
-  private final LogRollListener listener;
+  // Listeners that are called on WAL events.
+  private List<WALObserver> listeners =
+    new CopyOnWriteArrayList<WALObserver>();
   private final long optionalFlushInterval;
   private final long blocksize;
   private final int flushlogentries;
   private final String prefix;
   private final AtomicInteger unflushedEntries = new AtomicInteger(0);
   private final Path oldLogDir;
-  private final List<LogActionsListener> actionListeners =
-      Collections.synchronizedList(new ArrayList<LogActionsListener>());
 
 
   private static Class<? extends Writer> logWriterClass;
@@ -187,7 +189,8 @@ public class HLog implements Syncable {
     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
 
   /*
-   * Map of regions to first sequence/edit id in their memstore.
+   * Map of regions to most recent sequence/edit id in their memstore.
+   * Key is encoded region name.
    */
   private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
@@ -228,9 +231,6 @@ public class HLog implements Syncable {
    */
   private final LogSyncer logSyncerThread;
 
-  private final List<LogEntryVisitor> logEntryVisitors =
-      new CopyOnWriteArrayList<LogEntryVisitor>();
-
   /**
    * Pattern used to validate a HLog file name
    */
@@ -278,19 +278,18 @@ public class HLog implements Syncable {
   }
 
   /**
-   * HLog creating with a null actions listener.
+   * Constructor.
    *
    * @param fs filesystem handle
    * @param dir path to where hlogs are stored
    * @param oldLogDir path to where hlogs are archived
    * @param conf configuration to use
-   * @param listener listerner used to request log rolls
    * @throws IOException
    */
   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
-              final Configuration conf, final LogRollListener listener)
+              final Configuration conf)
   throws IOException {
-    this(fs, dir, oldLogDir, conf, listener, null, null);
+    this(fs, dir, oldLogDir, conf, null, null);
   }
 
   /**
@@ -304,22 +303,27 @@ public class HLog implements Syncable {
    * @param dir path to where hlogs are stored
    * @param oldLogDir path to where hlogs are archived
    * @param conf configuration to use
-   * @param listener listerner used to request log rolls
-   * @param actionListener optional listener for hlog actions like archiving
+   * @param listeners Listeners on WAL events. Listeners passed here will
+   * be registered before we do anything else; e.g. the
+   * Constructor {@link #rollWriter().
    * @param prefix should always be hostname and port in distributed env and
    *        it will be URL encoded before being used.
    *        If prefix is null, "hlog" will be used
    * @throws IOException
    */
   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
-              final Configuration conf, final LogRollListener listener,
-              final LogActionsListener actionListener, final String prefix)
+    final Configuration conf, final List<WALObserver> listeners,
+    final String prefix)
   throws IOException {
     super();
     this.fs = fs;
     this.dir = dir;
     this.conf = conf;
-    this.listener = listener;
+    if (listeners != null) {
+      for (WALObserver i: listeners) {
+        registerWALActionsListener(i);
+      }
+    }
     this.flushlogentries =
       conf.getInt("hbase.regionserver.flushlogentries", 1);
     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
@@ -339,14 +343,12 @@ public class HLog implements Syncable {
     }
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
-    LOG.info("HLog configuration: blocksize=" + this.blocksize +
-      ", rollsize=" + this.logrollsize +
+    LOG.info("HLog configuration: blocksize=" +
+      StringUtils.byteDesc(this.blocksize) +
+      ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
       ", enabled=" + this.enabled +
       ", flushlogentries=" + this.flushlogentries +
       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
-    if (actionListener != null) {
-      addLogActionsListerner(actionListener);
-    }
     // If prefix is null||empty then just name it hlog
     this.prefix = prefix == null || prefix.isEmpty() ?
         "hlog" : URLEncoder.encode(prefix, "UTF8");
@@ -355,22 +357,26 @@ public class HLog implements Syncable {
 
     // handle the reflection necessary to call getNumCurrentReplicas()
     this.getNumCurrentReplicas = null;
-    if(this.hdfs_out != null) {
+    Exception exception = null;
+    if (this.hdfs_out != null) {
       try {
         this.getNumCurrentReplicas = this.hdfs_out.getClass().
           getMethod("getNumCurrentReplicas", new Class<?> []{});
         this.getNumCurrentReplicas.setAccessible(true);
       } catch (NoSuchMethodException e) {
         // Thrown if getNumCurrentReplicas() function isn't available
+        exception = e;
       } catch (SecurityException e) {
         // Thrown if we can't get access to getNumCurrentReplicas()
+        exception = e;
         this.getNumCurrentReplicas = null; // could happen on setAccessible()
       }
     }
-    if(this.getNumCurrentReplicas != null) {
+    if (this.getNumCurrentReplicas != null) {
       LOG.info("Using getNumCurrentReplicas--HDFS-826");
     } else {
-      LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
+      LOG.info("getNumCurrentReplicas--HDFS-826 not available; hdfs_out=" +
+        this.hdfs_out + ", exception=" + exception.getMessage());
     }
 
     logSyncerThread = new LogSyncer(this.optionalFlushInterval);
@@ -378,6 +384,14 @@ public class HLog implements Syncable {
         Thread.currentThread().getName() + ".logSyncer");
   }
 
+  public void registerWALActionsListener (final WALObserver listener) {
+    this.listeners.add(listener);
+  }
+
+  public boolean unregisterWALActionsListener(final WALObserver listener) {
+    return this.listeners.remove(listener);
+  }
+
   /**
    * @return Current state of the monotonically increasing file id.
    */
@@ -429,7 +443,8 @@ public class HLog implements Syncable {
    * for the lock on this and consequently never release the cacheFlushLock
    *
    * @return If lots of logs, flush the returned regions so next time through
-   * we can clean logs. Returns null if nothing to flush.
+   * we can clean logs. Returns null if nothing to flush.  Names are actual
+   * region names as returned by {@link HRegionInfo#getEncodedName()}
    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
    * @throws IOException
    */
@@ -475,14 +490,14 @@ public class HLog implements Syncable {
         this.numEntries.set(0);
       }
       // Tell our listeners that a new log was created
-      if (!this.actionListeners.isEmpty()) {
-        for (LogActionsListener list : this.actionListeners) {
-          list.logRolled(newPath);
+      if (!this.listeners.isEmpty()) {
+        for (WALObserver i : this.listeners) {
+          i.logRolled(newPath);
         }
       }
       // Can we delete any of the old log files?
       if (this.outputfiles.size() > 0) {
-        if (this.lastSeqWritten.size() <= 0) {
+        if (this.lastSeqWritten.isEmpty()) {
           LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
           // If so, then no new writes have come in since all regions were
           // flushed (and removed from the lastSeqWritten map). Means can
@@ -557,7 +572,8 @@ public class HLog implements Syncable {
   /*
    * Clean up old commit logs.
    * @return If lots of logs, flush the returned region so next time through
-   * we can clean logs. Returns null if nothing to flush.
+   * we can clean logs. Returns null if nothing to flush.  Returns array of
+   * encoded region names to flush.
    * @throws IOException
    */
   private byte [][] cleanOldLogs() throws IOException {
@@ -584,10 +600,12 @@ public class HLog implements Syncable {
     }
 
     // If too many log files, figure which regions we need to flush.
+    // Array is an array of encoded region names.
     byte [][] regions = null;
     int logCount = this.outputfiles.size() - logsToRemove;
     if (logCount > this.maxLogs && this.outputfiles != null &&
         this.outputfiles.size() > 0) {
+      // This is an array of encoded region names.
       regions = findMemstoresWithEditsOlderThan(this.outputfiles.firstKey(),
         this.lastSeqWritten);
       StringBuilder sb = new StringBuilder();
@@ -631,6 +649,10 @@ public class HLog implements Syncable {
     return Collections.min(this.lastSeqWritten.values());
   }
 
+  /**
+   * @param oldestOutstandingSeqNum
+   * @return (Encoded) name of oldest outstanding region.
+   */
   private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
     byte [] oldestRegion = null;
     for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
@@ -758,7 +780,7 @@ public class HLog implements Syncable {
     final long now,
     final boolean isMetaRegion)
   throws IOException {
-    byte [] regionName = regionInfo.getRegionName();
+    byte [] regionName = regionInfo.getEncodedNameAsBytes();
     byte [] tableName = regionInfo.getTableDesc().getName();
     this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
   }
@@ -787,7 +809,6 @@ public class HLog implements Syncable {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    byte [] regionName = regionInfo.getRegionName();
     synchronized (updateLock) {
       long seqNum = obtainSeqNum();
       logKey.setLogSeqNum(seqNum);
@@ -796,7 +817,8 @@ public class HLog implements Syncable {
       // memstore). When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
+      this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
+        Long.valueOf(seqNum));
       doWrite(regionInfo, logKey, logEdit);
       this.unflushedEntries.incrementAndGet();
       this.numEntries.incrementAndGet();
@@ -807,8 +829,8 @@ public class HLog implements Syncable {
   }
 
   /**
-   * Append a set of edits to the log. Log edits are keyed by regionName,
-   * rowname, and log-sequence-id.
+   * Append a set of edits to the log. Log edits are keyed by (encoded)
+   * regionName, rowname, and log-sequence-id.
    *
    * Later, if we sort by these keys, we obtain all the relevant edits for a
    * given key-range of the HRegion (TODO). Any edits that do not have a
@@ -833,8 +855,6 @@ public class HLog implements Syncable {
     final long now)
   throws IOException {
     if (edits.isEmpty()) return;
-    
-    byte[] regionName = info.getRegionName();
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
@@ -845,8 +865,11 @@ public class HLog implements Syncable {
       // memstore). . When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      this.lastSeqWritten.putIfAbsent(regionName, seqNum);
-      HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
+      // Use encoded name.  Its shorter, guaranteed unique and a subset of
+      // actual  name.
+      byte [] hriKey = info.getEncodedNameAsBytes();
+      this.lastSeqWritten.putIfAbsent(hriKey, seqNum);
+      HLogKey logKey = makeKey(hriKey, tableName, seqNum, now);
       doWrite(info, logKey, edits);
       this.numEntries.incrementAndGet();
 
@@ -910,7 +933,7 @@ public class HLog implements Syncable {
         LOG.error("Error while syncing, requesting close of hlog ", e);
         requestLogRoll();
       } catch (InterruptedException e) {
-        LOG.debug(getName() + "interrupted while waiting for sync requests");
+        LOG.debug(getName() + " interrupted while waiting for sync requests");
       } finally {
         syncerShuttingDown = true;
         syncDone.signalAll();
@@ -1043,8 +1066,10 @@ public class HLog implements Syncable {
   }
 
   private void requestLogRoll() {
-    if (this.listener != null) {
-      this.listener.logRollRequested();
+    if (!this.listeners.isEmpty()) {
+      for (WALObserver i: this.listeners) {
+        i.logRollRequested();
+      }
     }
   }
 
@@ -1053,9 +1078,9 @@ public class HLog implements Syncable {
     if (!this.enabled) {
       return;
     }
-    if (!this.logEntryVisitors.isEmpty()) {
-      for (LogEntryVisitor visitor : this.logEntryVisitors) {
-        visitor.visitLogEntryBeforeWrite(info, logKey, logEdit);
+    if (!this.listeners.isEmpty()) {
+      for (WALObserver i: this.listeners) {
+        i.visitLogEntryBeforeWrite(info, logKey, logEdit);
       }
     }
     try {
@@ -1115,14 +1140,13 @@ public class HLog implements Syncable {
    *
    * Protected by cacheFlushLock
    *
-   * @param regionName
+   * @param encodedRegionName
    * @param tableName
    * @param logSeqId
    * @throws IOException
    */
-  public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
-    final long logSeqId,
-    final boolean isMetaRegion)
+  public void completeCacheFlush(final byte [] encodedRegionName,
+      final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
   throws IOException {
     try {
       if (this.closed) {
@@ -1131,15 +1155,15 @@ public class HLog implements Syncable {
       synchronized (updateLock) {
         long now = System.currentTimeMillis();
         WALEdit edit = completeCacheFlushLogEdit();
-        HLogKey key = makeKey(regionName, tableName, logSeqId,
+        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
             System.currentTimeMillis());
         this.writer.append(new Entry(key, edit));
         writeTime += System.currentTimeMillis() - now;
         writeOps++;
         this.numEntries.incrementAndGet();
-        Long seq = this.lastSeqWritten.get(regionName);
+        Long seq = this.lastSeqWritten.get(encodedRegionName);
         if (seq != null && logSeqId >= seq.longValue()) {
-          this.lastSeqWritten.remove(regionName);
+          this.lastSeqWritten.remove(encodedRegionName);
         }
       }
       // sync txn to file system
@@ -1475,9 +1499,10 @@ public class HLog implements Syncable {
       conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
     boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
     HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
-    NamingThreadFactory f  = new NamingThreadFactory(
-            "SplitWriter-%1$d", Executors.defaultThreadFactory());
-    ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("SplitWriter-%1$d");
+    ThreadFactory factory = builder.build();
+    ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory);
     for (final byte [] region : splitLogsMap.keySet()) {
       Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
       writeFutureResult.put(region, threadPool.submit(splitter));
@@ -1558,7 +1583,7 @@ public class HLog implements Syncable {
     try {
       Entry entry;
       while ((entry = in.next()) != null) {
-        byte[] region = entry.getKey().getRegionName();
+        byte[] region = entry.getKey().getEncodedRegionName();
         LinkedList<Entry> queue = splitLogsMap.get(region);
         if (queue == null) {
           queue = new LinkedList<Entry>();
@@ -1682,7 +1707,7 @@ public class HLog implements Syncable {
     Path tableDir = HTableDescriptor.getTableDir(rootDir,
       logEntry.getKey().getTablename());
     Path regiondir = HRegion.getRegionDir(tableDir,
-      HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+      Bytes.toString(logEntry.getKey().getEncodedRegionName()));
     Path dir = getRegionDirRecoveredEditsDir(regiondir);
     if (!fs.exists(dir)) {
       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
@@ -1759,32 +1784,6 @@ public class HLog implements Syncable {
     return new Path(regiondir, RECOVERED_EDITS_DIR);
   }
 
-  /**
-   *
-   * @param visitor
-   */
-  public void addLogEntryVisitor(LogEntryVisitor visitor) {
-    this.logEntryVisitors.add(visitor);
-  }
-
-  /**
-   * 
-   * @param visitor
-   */
-  public void removeLogEntryVisitor(LogEntryVisitor visitor) {
-    this.logEntryVisitors.remove(visitor);
-  }
-
-
-  public void addLogActionsListerner(LogActionsListener list) {
-    LOG.info("Adding a listener");
-    this.actionListeners.add(list);
-  }
-
-  public boolean removeLogActionsListener(LogActionsListener list) {
-    return this.actionListeners.remove(list);
-  }
-
   private static void usage() {
     System.err.println("Usage: java org.apache.hbase.HLog" +
         " {--dump <logfile>... | --split <logdir>...}");
@@ -1846,5 +1845,4 @@ public class HLog implements Syncable {
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
       ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
-
-}
+}
\ No newline at end of file



Mime
View raw message