hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r1068206 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/coprocessor/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/test/java/org/apache/hadoop/hbase/coprocessor/...
Date Mon, 07 Feb 2011 23:01:29 GMT
Author: apurtell
Date: Mon Feb  7 23:01:28 2011
New Revision: 1068206

URL: http://svn.apache.org/viewvc?rev=1068206&view=rev
Log:
HBASE-3257 Coprocessors: Extend server side API to include HLog operations

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Feb  7 23:01:28 2011
@@ -49,8 +49,6 @@ Release 0.91.0 - Unreleased
 
 
   IMPROVEMENTS
-   HBASE-2001  Coprocessors: Colocate user code with regions (Mingjie Lai via
-               Andrew Purtell)
    HBASE-3290  Max Compaction Size (Nicolas Spiegelberg via Stack)  
    HBASE-3292  Expose block cache hit/miss/evict counts into region server
                metrics
@@ -60,9 +58,6 @@ Release 0.91.0 - Unreleased
    HBASE-1861  Multi-Family support for bulk upload tools
    HBASE-3308  SplitTransaction.splitStoreFiles slows splits a lot
    HBASE-3328  Added Admin API to specify explicit split points
-   HBASE-3345  Coprocessors: Allow observers to completely override base
-               function
-   HBASE-3260  Coprocessors: Add explicit lifecycle management
    HBASE-3377  Upgrade Jetty to 6.1.26
    HBASE-3387  Pair does not deep check arrays for equality
                (Jesse Yates via Stack)
@@ -77,16 +72,24 @@ Release 0.91.0 - Unreleased
 
 
   NEW FEATURES
+   HBASE-2001  Coprocessors: Colocate user code with regions (Mingjie Lai via
+               Andrew Purtell)
    HBASE-3287  Add option to cache blocks on hfile write and evict blocks on
                hfile close
    HBASE-3335  Add BitComparator for filtering (Nathaniel Cook via Stack)
+   HBASE-3260  Coprocessors: Add explicit lifecycle management
    HBASE-3256  Coprocessors: Coprocessor host and observer for HMaster
+   HBASE-3345  Coprocessors: Allow observers to completely override base
+               function
    HBASE-3448  RegionSplitter, utility class to manually split tables
    HBASE-2824  A filter that randomly includes rows based on a configured
                chance (Ferdy via Andrew Purtell)
    HBASE-3455  Add memstore-local allocation buffers to combat heap
                fragmentation in the region server. Enabled by default as of
                0.91
+   HBASE-3257  Coprocessors: Extend server side API to include HLog operations
+               (Mingjie Lai via Andrew Purtell)
+
 
 Release 0.90.1 - Unreleased
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
Mon Feb  7 23:01:28 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coproces
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
 import java.io.IOException;
 
@@ -222,4 +225,14 @@ public abstract class BaseRegionObserver
   public void postScannerClose(final RegionCoprocessorEnvironment e,
       final InternalScanner s) throws IOException {
   }
+
+  @Override
+  public void preWALRestore(RegionCoprocessorEnvironment env, HRegionInfo info,
+      HLogKey logKey, WALEdit logEdit) throws IOException {
+  }
+
+  @Override
+  public void postWALRestore(RegionCoprocessorEnvironment env,
+      HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Mon
Feb  7 23:01:28 2011
@@ -51,6 +51,9 @@ public abstract class CoprocessorHost<E 
       "hbase.coprocessor.region.classes";
   public static final String MASTER_COPROCESSOR_CONF_KEY =
       "hbase.coprocessor.master.classes";
+  public static final String WAL_COPROCESSOR_CONF_KEY =
+    "hbase.coprocessor.wal.classes";
+
   private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
   /** Ordered set of loaded coprocessors with lock */
   protected final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Mon
Feb  7 23:01:28 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coproces
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -29,6 +30,8 @@ import org.apache.hadoop.hbase.client.In
 import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
 import java.io.IOException;
 
@@ -529,4 +532,30 @@ public interface RegionObserver extends 
   public void postScannerClose(final RegionCoprocessorEnvironment e,
       final InternalScanner s)
     throws IOException;
+
+  /**
+   * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+   * replayed for this region.
+   *
+   * @param env
+   * @param info
+   * @param logKey
+   * @param logEdit
+   * @throws IOException
+   */
+  void preWALRestore(final RegionCoprocessorEnvironment env,
+      HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+
+  /**
+   * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+   * replayed for this region.
+   *
+   * @param env
+   * @param info
+   * @param logKey
+   * @param logEdit
+   * @throws IOException
+   */
+  void postWALRestore(final RegionCoprocessorEnvironment env,
+      HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
 }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
Mon Feb  7 23:01:28 2011
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+public interface WALCoprocessorEnvironment extends CoprocessorEnvironment {
+  /** @return reference to the region server services */
+  public HLog getWAL();
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java Mon Feb
 7 23:01:28 2011
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+import java.io.IOException;
+
+/**
+ * It's provided to have a way for coprocessors to observe, rewrite,
+ * or skip WALEdits as they are being written to the WAL.
+ *
+ * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} provides
+ * hooks for adding logic for WALEdits in the region context during reconstruction,
+ *
+ * Defines coprocessor hooks for interacting with operations on the
+ * {@link org.apache.hadoop.hbase.regionserver.wal.HLog}.
+ */
+public interface WALObserver extends Coprocessor {
+
+  /**
+   * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+   * is writen to WAL.
+   *
+   * @param env
+   * @param info
+   * @param logKey
+   * @param logEdit
+   * @return true if default behavior should be bypassed, false otherwise
+   * @throws IOException
+   */
+  boolean preWALWrite(CoprocessorEnvironment env,
+      HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+
+  /**
+   * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+   * is writen to WAL.
+   *
+   * @param env
+   * @param info
+   * @param logKey
+   * @param logEdit
+   * @throws IOException
+   */
+  void postWALWrite(CoprocessorEnvironment env,
+      HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Feb  7
23:01:28 2011
@@ -2009,6 +2009,16 @@ public class HRegion implements HeapSize
       while ((entry = reader.next()) != null) {
         HLogKey key = entry.getKey();
         WALEdit val = entry.getEdit();
+
+        // Start coprocessor replay here. The coprocessor is for each WALEdit
+        // instead of a KeyValue.
+        if (coprocessorHost != null) {
+          if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
+            // if bypass this log entry, ignore it ...
+            continue;
+          }
+        }
+
         if (firstSeqIdInLog == -1) {
           firstSeqIdInLog = key.getLogSeqNum();
         }
@@ -2046,6 +2056,10 @@ public class HRegion implements HeapSize
         }
         if (flush) internalFlushcache(null, currentEditSeqId);
 
+        if (coprocessorHost != null) {
+          coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
+        }
+
         // Every 'interval' edits, tell the reporter we're making progress.
         // Have seen 60k edits taking 3minutes to complete.
         if (reporter != null && (editsCount % interval) == 0) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
Mon Feb  7 23:01:28 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.client.co
 import org.apache.hadoop.hbase.coprocessor.*;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.util.StringUtils;
@@ -1004,4 +1007,56 @@ public class RegionCoprocessorHost
       coprocessorLock.readLock().unlock();
     }
   }
+
+  /**
+   * @param info
+   * @param logKey
+   * @param logEdit
+   * @return true if default behavior should be bypassed, false otherwise
+   * @throws IOException
+   */
+  public boolean preWALRestore(HRegionInfo info, HLogKey logKey,
+      WALEdit logEdit) throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preWALRestore(env, info, logKey,
+              logEdit);
+        }
+        bypass |= env.shouldBypass();
+        if (env.shouldComplete()) {
+          break;
+        }
+      }
+      return bypass;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param info
+   * @param logKey
+   * @param logEdit
+   * @throws IOException
+   */
+  public void postWALRestore(HRegionInfo info, HLogKey logKey,
+      WALEdit logEdit) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postWALRestore(env, info,
+              logKey, logEdit);
+        }
+        if (env.shouldComplete()) {
+          break;
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Feb 
7 23:01:28 2011
@@ -135,6 +135,8 @@ public class HLog implements Syncable {
   private static Class<? extends Writer> logWriterClass;
   private static Class<? extends Reader> logReaderClass;
 
+  private WALCoprocessorHost coprocessorHost;
+
   static void resetLogReaderClass() {
     HLog.logReaderClass = null;
   }
@@ -400,6 +402,7 @@ public class HLog implements Syncable {
     logSyncerThread = new LogSyncer(this.optionalFlushInterval);
     Threads.setDaemonThreadRunning(logSyncerThread,
         Thread.currentThread().getName() + ".logSyncer");
+    coprocessorHost = new WALCoprocessorHost(this, conf);
   }
 
   public void registerWALActionsListener (final WALObserver listener) {
@@ -1074,8 +1077,13 @@ public class HLog implements Syncable {
     }
     try {
       long now = System.currentTimeMillis();
-      this.writer.append(new HLog.Entry(logKey, logEdit));
+      // coprocessor hook:
+      if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
+        // if not bypassed:
+        this.writer.append(new HLog.Entry(logKey, logEdit));
+      }
       long took = System.currentTimeMillis() - now;
+      coprocessorHost.postWALWrite(info, logKey, logEdit);
       writeTime += took;
       writeOps++;
       if (took > 1000) {
@@ -1445,6 +1453,13 @@ public class HLog implements Syncable {
   }
 
   /**
+   * @return Coprocessor host.
+   */
+  public WALCoprocessorHost getCoprocessorHost() {
+    return coprocessorHost;
+  }
+
+  /**
    * Pass one or more log file names and it will either dump out a text version
    * on <code>stdout</code> or split the specified log files.
    *

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
Mon Feb  7 23:01:28 2011
@@ -0,0 +1,142 @@
+
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.coprocessor.*;
+import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Implements the coprocessor environment and runtime support for coprocessors
+ * loaded within a {@link HLog}.
+ */
+public class WALCoprocessorHost
+    extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
+  
+  private static final Log LOG = LogFactory.getLog(WALCoprocessorHost.class);
+
+  /**
+   * Encapsulation of the environment of each coprocessor
+   */
+  static class WALEnvironment extends CoprocessorHost.Environment
+    implements WALCoprocessorEnvironment {
+
+    private HLog wal;
+
+    @Override
+    public HLog getWAL() {
+      return wal;
+    }
+
+    /**
+     * Constructor
+     * @param impl the coprocessor instance
+     * @param priority chaining priority
+     */
+    public WALEnvironment(Class<?> implClass, final Coprocessor impl,
+        Coprocessor.Priority priority, final HLog hlog) {
+      super(impl, priority);
+      this.wal = hlog;
+    }
+  }
+
+  HLog wal;
+  /**
+   * Constructor
+   * @param region the region
+   * @param rsServices interface to available region server functionality
+   * @param conf the configuration
+   */
+  public WALCoprocessorHost(final HLog log, final Configuration conf) {
+    this.wal = log;
+    // load system default cp's from configuration.
+    loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
+  }
+
+  @Override
+  public WALEnvironment createEnvironment(Class<?> implClass,
+      Coprocessor instance, Priority priority) {
+    // TODO Auto-generated method stub
+    return new WALEnvironment(implClass, instance, priority, this.wal);
+  }
+
+  /**
+   * @param info
+   * @param logKey
+   * @param logEdit
+   * @return true if default behavior should be bypassed, false otherwise
+   * @throws IOException
+   */
+  public boolean preWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
+      throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (WALEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof 
+            org.apache.hadoop.hbase.coprocessor.WALObserver) {
+          ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
+              preWALWrite(env, info, logKey, logEdit);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param info
+   * @param logKey
+   * @param logEdit
+   * @throws IOException
+   */
+  public void postWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (WALEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof 
+            org.apache.hadoop.hbase.coprocessor.WALObserver) {
+          ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
+              postWALWrite(env, info, logKey, logEdit);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
(added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
Mon Feb  7 23:01:28 2011
@@ -0,0 +1,162 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Class for testing WAL coprocessor extension. WAL write monitor is defined
+ * in LogObserver while WAL Restore is in RegionObserver.
+ *
+ * It will monitor a WAL writing and Restore, modify passed-in WALEdit, i.e,
+ * ignore specified columns when writing, and add a KeyValue. On the other
+ * hand, it checks whether the ignored column is still in WAL when Restoreed
+ * at region reconstruct.
+ */
+public class SampleRegionWALObserver extends BaseRegionObserverCoprocessor
+implements WALObserver {
+
+  private static final Log LOG = LogFactory.getLog(SampleRegionWALObserver.class);
+
+  private byte[] tableName;
+  private byte[] row;
+  private byte[] ignoredFamily;
+  private byte[] ignoredQualifier;
+  private byte[] addedFamily;
+  private byte[] addedQualifier;
+  private byte[] changedFamily;
+  private byte[] changedQualifier;
+
+  private boolean preWALWriteCalled = false;
+  private boolean postWALWriteCalled = false;
+  private boolean preWALRestoreCalled = false;
+  private boolean postWALRestoreCalled = false;
+
+  /**
+   * Set values: with a table name, a column name which will be ignored, and
+   * a column name which will be added to WAL.
+   */
+  public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq,
+      byte[] chf, byte[] chq, byte[] addf, byte[] addq) {
+    this.row = row;
+    this.tableName = tableName;
+    this.ignoredFamily = igf;
+    this.ignoredQualifier = igq;
+    this.addedFamily = addf;
+    this.addedQualifier = addq;
+    this.changedFamily = chf;
+    this.changedQualifier = chq;
+  }
+
+
+  @Override
+  public void postWALWrite(CoprocessorEnvironment env, HRegionInfo info,
+      HLogKey logKey, WALEdit logEdit) throws IOException {
+    postWALWriteCalled = true;
+  }
+
+  @Override
+  public boolean preWALWrite(CoprocessorEnvironment env, HRegionInfo info,
+      HLogKey logKey, WALEdit logEdit) throws IOException {
+    boolean bypass = false;
+    // check table name matches or not.
+    if (!Arrays.equals(HRegionInfo.getTableName(info.getRegionName()), this.tableName)) {
+      return bypass;
+    }
+    preWALWriteCalled = true;
+    // here we're going to remove one keyvalue from the WALEdit, and add
+    // another one to it.
+    List<KeyValue> kvs = logEdit.getKeyValues();
+    KeyValue deletedKV = null;
+    for (KeyValue kv : kvs) {
+      // assume only one kv from the WALEdit matches.
+      byte[] family = kv.getFamily();
+      byte[] qulifier = kv.getQualifier();
+
+      if (Arrays.equals(family, ignoredFamily) &&
+          Arrays.equals(qulifier, ignoredQualifier)) {
+        LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
+        deletedKV = kv;
+      }
+      if (Arrays.equals(family, changedFamily) &&
+          Arrays.equals(qulifier, changedQualifier)) {
+        LOG.debug("Found the KeyValue from WALEdit which should be changed.");
+        kv.getBuffer()[kv.getValueOffset()] += 1;
+      }
+    }
+    kvs.add(new KeyValue(row, addedFamily, addedQualifier));
+    if (deletedKV != null) {
+      LOG.debug("About to delete a KeyValue from WALEdit.");
+      kvs.remove(deletedKV);
+    }
+    return bypass;
+  }
+
+  /**
+   * Triggered before  {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
+   * Restoreed.
+   */
+  @Override
+  public void preWALRestore(RegionCoprocessorEnvironment env, HRegionInfo info,
+      HLogKey logKey, WALEdit logEdit) throws IOException {
+    preWALRestoreCalled = true;
+  }
+
+  /**
+   * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
+   * Restoreed.
+   */
+  @Override
+  public void postWALRestore(RegionCoprocessorEnvironment env,
+      HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
+    postWALRestoreCalled = true;
+  }
+
+  public boolean isPreWALWriteCalled() {
+    return preWALWriteCalled;
+  }
+
+  public boolean isPostWALWriteCalled() {
+    return postWALWriteCalled;
+  }
+
+  public boolean isPreWALRestoreCalled() {
+    LOG.debug(SampleRegionWALObserver.class.getName() +
+      ".isPreWALRestoreCalled is called.");
+    return preWALRestoreCalled;
+  }
+
+  public boolean isPostWALRestoreCalled() {
+    LOG.debug(SampleRegionWALObserver.class.getName() +
+      ".isPostWALRestoreCalled is called.");
+    return postWALRestoreCalled;
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
Mon Feb  7 23:01:28 2011
@@ -67,6 +67,8 @@ public class SimpleRegionObserver extend
   boolean hadPostGetClosestRowBefore = false;
   boolean hadPreIncrement = false;
   boolean hadPostIncrement = false;
+  boolean hadPreWALRestored = false;
+  boolean hadPostWALRestored = false;
 
   @Override
   public void preOpen(RegionCoprocessorEnvironment e) {
@@ -333,4 +335,12 @@ public class SimpleRegionObserver extend
   boolean hadPostIncrement() {
     return hadPostIncrement;
   }
+
+  boolean hadPreWALRestored() {
+    return hadPreWALRestored;
+  }
+
+  boolean hadPostWALRestored() {
+    return hadPostWALRestored;
+  }
 }

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java
(added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java
Mon Feb  7 23:01:28 2011
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver}
+ * interface hooks at all appropriate times during normal HMaster operations.
+ */
+public class TestWALCoprocessors {
+  private static final Log LOG = LogFactory.getLog(TestWALCoprocessors.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
+  private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
+    Bytes.toBytes("fam2"),
+    Bytes.toBytes("fam3"),
+  };
+  private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
+    Bytes.toBytes("q2"),
+    Bytes.toBytes("q3"),
+  };
+  private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
+    Bytes.toBytes("v2"),
+    Bytes.toBytes("v3"),
+  };
+  private static byte[] TEST_ROW = Bytes.toBytes("testRow");
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path dir;
+  private MiniDFSCluster cluster;
+  private Path hbaseRootDir;
+  private Path oldLogDir;
+  private Path logDir;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+        SampleRegionWALObserver.class.getName());
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        SampleRegionWALObserver.class.getName());
+    conf.setBoolean("dfs.support.append", true);
+    conf.setInt("dfs.client.block.recovery.retries", 2);
+    conf.setInt("hbase.regionserver.flushlogentries", 1);
+
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000);
+    Path hbaseRootDir =
+      TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
+    LOG.info("hbase.rootdir=" + hbaseRootDir);
+    conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString());
+  }
+
+  @AfterClass
+  public static void teardownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    //this.cluster = TEST_UTIL.getDFSCluster();
+    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    this.hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR));
+    this.dir = new Path(this.hbaseRootDir, TestWALCoprocessors.class.getName());
+    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
+
+    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
+      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+  }
+
+  /**
+   * Test WAL write behavior with WALObserver. The coprocessor monitors
+   * a WALEdit written to WAL, and ignore, modify, and add KeyValue's for the
+   * WALEdit.
+   */
+  @Test
+  public void testWWALCoprocessorWriteToWAL() throws Exception {
+    HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
+    Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
+    deleteDir(basedir);
+    fs.mkdirs(new Path(basedir, hri.getEncodedName()));
+
+    HLog log = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
+    SampleRegionWALObserver cp = getCoprocessor(log);
+
+    // TEST_FAMILY[0] shall be removed from WALEdit.
+    // TEST_FAMILY[1] value shall be changed.
+    // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
+    cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
+        TEST_FAMILY[1], TEST_QUALIFIER[1],
+        TEST_FAMILY[2], TEST_QUALIFIER[2]);
+
+    assertFalse(cp.isPreWALWriteCalled());
+    assertFalse(cp.isPostWALWriteCalled());
+
+    // TEST_FAMILY[2] is not in the put, however it shall be added by the tested
+    // coprocessor.
+    // Use a Put to create familyMap.
+    Put p = creatPutWith2Families(TEST_ROW);
+
+    Map<byte [], List<KeyValue>> familyMap = p.getFamilyMap();
+    WALEdit edit = new WALEdit();
+    addFamilyMapToWALEdit(familyMap, edit);
+
+    boolean foundFamily0 = false;
+    boolean foundFamily2 = false;
+    boolean modifiedFamily1 = false;
+
+    List<KeyValue> kvs = edit.getKeyValues();
+
+    for (KeyValue kv : kvs) {
+      if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
+        foundFamily0 = true;
+      }
+      if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
+        foundFamily2 = true;
+      }
+      if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
+        if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
+          modifiedFamily1 = true;
+        }
+      }
+    }
+    assertTrue(foundFamily0);
+    assertFalse(foundFamily2);
+    assertFalse(modifiedFamily1);
+
+    // it's where WAL write cp should occur.
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    log.append(hri, hri.getTableDesc().getName(), edit, now);
+
+    // the edit shall have been change now by the coprocessor.
+    foundFamily0 = false;
+    foundFamily2 = false;
+    modifiedFamily1 = false;
+    for (KeyValue kv : kvs) {
+      if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
+        foundFamily0 = true;
+      }
+      if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
+        foundFamily2 = true;
+      }
+      if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
+        if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
+          modifiedFamily1 = true;
+        }
+      }
+    }
+    assertFalse(foundFamily0);
+    assertTrue(foundFamily2);
+    assertTrue(modifiedFamily1);
+
+    assertTrue(cp.isPreWALWriteCalled());
+    assertTrue(cp.isPostWALWriteCalled());
+  }
+
+  /**
+   * Test WAL replay behavior with WALObserver.
+   */
+  @Test
+  public void testWALCoprocessorReplay() throws Exception {
+    // WAL replay is handled at HRegion::replayRecoveredEdits(), which is
+    // ultimately called by HRegion::initialize()
+    byte[] tableName = Bytes.toBytes("testWALCoprocessorReplay");
+
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
+    final Path basedir = new Path(this.hbaseRootDir, Bytes.toString(tableName));
+    deleteDir(basedir);
+    fs.mkdirs(new Path(basedir, hri.getEncodedName()));
+
+    //HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
+    HLog wal = createWAL(this.conf);
+    //Put p = creatPutWith2Families(TEST_ROW);
+    WALEdit edit = new WALEdit();
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    //addFamilyMapToWALEdit(p.getFamilyMap(), edit);
+    final int countPerFamily = 1000;
+    for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+      addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
+          EnvironmentEdgeManager.getDelegate(), wal);
+    }
+    wal.append(hri, tableName, edit, now);
+    // sync to fs.
+    wal.sync();
+
+    final Configuration newConf = HBaseConfiguration.create(this.conf);
+    User user = HBaseTestingUtility.getDifferentUser(newConf,
+        ".replay.wal.secondtime");
+    user.runAs(new PrivilegedExceptionAction() {
+      public Object run() throws Exception {
+        runWALSplit(newConf);
+        FileSystem newFS = FileSystem.get(newConf);
+        // Make a new wal for new region open.
+        HLog wal2 = createWAL(newConf);
+        HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf),
+          newConf, hri, TEST_UTIL.getHBaseCluster().getRegionServer(0));
+        long seqid2 = region2.initialize();
+
+        SampleRegionWALObserver cp2 =
+          (SampleRegionWALObserver)region2.getCoprocessorHost().findCoprocessor(
+              SampleRegionWALObserver.class.getName());
+        // TODO: asserting here is problematic.
+        assertNotNull(cp2);
+        assertTrue(cp2.isPreWALRestoreCalled());
+        assertTrue(cp2.isPostWALRestoreCalled());
+        region2.close();
+        wal2.closeAndDelete();
+        return null;
+      }
+    });
+  }
+  /**
+   * Test to see CP loaded successfully or not. There is a duplication
+   * at TestHLog, but the purpose of that one is to see whether the loaded
+   * CP will impact existing HLog tests or not.
+   */
+  @Test
+  public void testWALCoprocessorLoaded() throws Exception {
+    HLog log = new HLog(fs, dir, oldLogDir, conf);
+    assertNotNull(getCoprocessor(log));
+  }
+
+  private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception {
+    WALCoprocessorHost host = wal.getCoprocessorHost();
+    Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+    return (SampleRegionWALObserver)c;
+  }
+
+  /*
+   * Creates an HRI around an HTD that has <code>tableName</code> and three
+   * column families named.
+   * @param tableName Name of table to use when we create HTableDescriptor.
+   */
+  private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+
+    for (int i = 0; i < TEST_FAMILY.length; i++ ) {
+      HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
+      htd.addFamily(a);
+    }
+    return new HRegionInfo(htd, null, null, false);
+  }
+
+  /*
+   * @param p Directory to cleanup
+   */
+  private void deleteDir(final Path p) throws IOException {
+    if (this.fs.exists(p)) {
+      if (!this.fs.delete(p, true)) {
+        throw new IOException("Failed remove of " + p);
+      }
+    }
+  }
+
+  private Put creatPutWith2Families(byte[] row) throws IOException {
+    Put p = new Put(row);
+    for (int i = 0; i < TEST_FAMILY.length-1; i++ ) {
+      p.add(TEST_FAMILY[i], TEST_QUALIFIER[i],
+          TEST_VALUE[i]);
+    }
+    return p;
+  }
+
+  /**
+   * Copied from HRegion.
+   *
+   * @param familyMap map of family->edits
+   * @param walEdit the destination entry to append into
+   */
+  private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
+      WALEdit walEdit) {
+    for (List<KeyValue> edits : familyMap.values()) {
+      for (KeyValue kv : edits) {
+        walEdit.add(kv);
+      }
+    }
+  }
+  private Path runWALSplit(final Configuration c) throws IOException {
+    FileSystem fs = FileSystem.get(c);
+    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
+        this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
+    List<Path> splits = logSplitter.splitLog();
+    // Split should generate only 1 file since there's only 1 region
+    assertEquals(1, splits.size());
+    // Make sure the file exists
+    assertTrue(fs.exists(splits.get(0)));
+    LOG.info("Split file=" + splits.get(0));
+    return splits.get(0);
+  }
+  private HLog createWAL(final Configuration c) throws IOException {
+    HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
+    return wal;
+  }
+  private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
+      final byte [] rowName, final byte [] family,
+      final int count, EnvironmentEdge ee, final HLog wal)
+  throws IOException {
+    String familyStr = Bytes.toString(family);
+    for (int j = 0; j < count; j++) {
+      byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
+      byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
+      WALEdit edit = new WALEdit();
+      edit.add(new KeyValue(rowName, family, qualifierBytes,
+        ee.currentTimeMillis(), columnBytes));
+      wal.append(hri, tableName, edit, ee.currentTimeMillis());
+    }
+  }
+}
+

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Mon Feb
 7 23:01:28 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionse
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -45,6 +46,9 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.coprocessor.Coprocessor;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
@@ -107,6 +111,8 @@ public class TestHLog  {
         .setInt("ipc.client.connect.max.retries", 1);
     TEST_UTIL.getConfiguration().setInt(
         "dfs.client.block.recovery.retries", 1);
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+        SampleRegionWALObserver.class.getName());
     TEST_UTIL.startMiniCluster(3);
 
     conf = TEST_UTIL.getConfiguration();
@@ -640,6 +646,18 @@ public class TestHLog  {
     assertEquals(0, log.getNumLogFiles());
   }
 
+  /**
+   * A loaded WAL coprocessor won't break existing HLog test cases.
+   */
+  @Test
+  public void testWALCoprocessorLoaded() throws Exception {
+    // test to see whether the coprocessor is loaded or not.
+    HLog log = new HLog(fs, dir, oldLogDir, conf);
+    WALCoprocessorHost host = log.getCoprocessorHost();
+    Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+    assertNotNull(c);
+  }
+
   private void addEdits(HLog log, HRegionInfo hri, byte [] tableName,
                         int times) throws IOException {
     final byte [] row = Bytes.toBytes("row");



Mime
View raw message