hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1425513 [3/7] - in /hbase/branches/0.94-test: ./ bin/ conf/ security/src/main/java/org/apache/hadoop/hbase/ipc/ security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/test/java/org/apache/hadoop/hbase/security/access/...
Date Sun, 23 Dec 2012 19:34:56 GMT
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Dec 23 19:34:53 2012
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.EOFException;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
@@ -56,14 +55,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -79,7 +76,6 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Append;
@@ -232,34 +228,15 @@ public class HRegion implements HeapSize
    * The directory for the table this region is part of.
    * This directory contains the directory for this region.
    */
-  private final Path tableDir;
+  final Path tableDir;
 
-  private final HLog log;
-  private final FileSystem fs;
-  private final Configuration conf;
-  private final int rowLockWaitDuration;
+  final HLog log;
+  final FileSystem fs;
+  final Configuration conf;
+  final int rowLockWaitDuration;
   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
-
-  // The internal wait duration to acquire a lock before read/update
-  // from the region. It is not per row. The purpose of this wait time
-  // is to avoid waiting a long time while the region is busy, so that
-  // we can release the IPC handler soon enough to improve the
-  // availability of the region server. It can be adjusted by
-  // tuning configuration "hbase.busy.wait.duration".
-  final long busyWaitDuration;
-  static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
-
-  // If updating multiple rows in one call, wait longer,
-  // i.e. waiting for busyWaitDuration * # of rows. However,
-  // we can limit the max multiplier.
-  final int maxBusyWaitMultiplier;
-
-  // Max busy wait duration. There is no point to wait longer than the RPC
-  // purge timeout, when a RPC call will be terminated by the RPC engine.
-  final long maxBusyWaitDuration;
-
-  private final HRegionInfo regionInfo;
-  private final Path regiondir;
+  final HRegionInfo regionInfo;
+  final Path regiondir;
   KeyValue.KVComparator comparator;
 
   private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
@@ -377,10 +354,6 @@ public class HRegion implements HeapSize
     this.coprocessorHost = null;
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
     this.opMetrics = new OperationMetrics();
-
-    this.maxBusyWaitDuration = 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
-    this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
-    this.maxBusyWaitMultiplier = 2;
   }
 
   /**
@@ -427,17 +400,6 @@ public class HRegion implements HeapSize
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
     this.opMetrics = new OperationMetrics(conf, this.regionInfo);
 
-    this.busyWaitDuration = conf.getLong(
-      "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
-    this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
-    if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
-      throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
-        + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
-        + maxBusyWaitMultiplier + "). Their product should be positive");
-    }
-    this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
-      2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
     /*
      * timestamp.slop provides a server-side constraint on the timestamp. This
      * assumes that you base your TS around currentTimeMillis(). In this case,
@@ -726,7 +688,7 @@ public class HRegion implements HeapSize
   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
     if (this.rsAccounting != null) {
       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
-    }
+    }  
     return this.memstoreSize.getAndAdd(memStoreSize);
   }
 
@@ -752,7 +714,7 @@ public class HRegion implements HeapSize
 
     // and then create the file
     Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
-
+    
     // if datanode crashes or if the RS goes down just before the close is called while trying to
     // close the created regioninfo file in the .tmp directory then on next
     // creation we will be getting AlreadyCreatedException.
@@ -760,7 +722,7 @@ public class HRegion implements HeapSize
     if (FSUtils.isExists(fs, tmpPath)) {
       FSUtils.delete(fs, tmpPath, true);
     }
-
+    
     FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
 
     try {
@@ -777,26 +739,6 @@ public class HRegion implements HeapSize
     }
   }
 
-  /**
-   * @param fs
-   * @param dir
-   * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
-   * @throws IOException
-   */
-  public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
-  throws IOException {
-    Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
-    if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
-    FSDataInputStream in = fs.open(regioninfo);
-    try {
-      HRegionInfo hri = new HRegionInfo();
-      hri.readFields(in);
-      return hri;
-    } finally {
-      in.close();
-    }
-  }
-
   /** @return a HRegionInfo object for this region */
   public HRegionInfo getRegionInfo() {
     return this.regionInfo;
@@ -941,7 +883,6 @@ public class HRegion implements HeapSize
 
     this.closing.set(true);
     status.setStatus("Disabling writes for close");
-    // block waiting for the lock for closing
     lock.writeLock().lock();
     try {
       if (this.isClosed()) {
@@ -1043,16 +984,19 @@ public class HRegion implements HeapSize
     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
   }
 
-  static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+  private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
       final String threadNamePrefix) {
-    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
-      new ThreadFactory() {
-        private int count = 1;
-
-        public Thread newThread(Runnable r) {
-          return new Thread(r, threadNamePrefix + "-" + count++);
-        }
-      });
+    ThreadPoolExecutor openAndCloseThreadPool = Threads
+        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+            new ThreadFactory() {
+              private int count = 1;
+
+              public Thread newThread(Runnable r) {
+                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+                return t;
+              }
+            });
+    return openAndCloseThreadPool;
   }
 
    /**
@@ -1248,7 +1192,6 @@ public class HRegion implements HeapSize
       return false;
     }
     Preconditions.checkArgument(cr.getHRegion().equals(this));
-    // block waiting for the lock for compaction
     lock.readLock().lock();
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Compacting " + cr.getStore() + " in " + this);
@@ -1328,7 +1271,6 @@ public class HRegion implements HeapSize
     }
     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
     status.setStatus("Acquiring readlock on region");
-    // block waiting for the lock for flushing cache
     lock.readLock().lock();
     try {
       if (this.closed.get()) {
@@ -1464,7 +1406,6 @@ public class HRegion implements HeapSize
     // end up in both snapshot and memstore (makes it difficult to do atomic
     // rows then)
     status.setStatus("Obtaining lock to block concurrent updates");
-    // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
     long flushsize = this.memstoreSize.get();
     status.setStatus("Preparing to flush by snapshotting stores");
@@ -1723,23 +1664,11 @@ public class HRegion implements HeapSize
   //////////////////////////////////////////////////////////////////////////////
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * @param delete delete object
-   * @param writeToWAL append to the write ahead lock or not
-   * @throws IOException read exceptions
-   */
-  public void delete(Delete delete, boolean writeToWAL)
-  throws IOException {
-    delete(delete, null, writeToWAL);
-  }
-
   /**
    * @param delete delete object
    * @param lockid existing lock id, or null for grab a lock
    * @param writeToWAL append to the write ahead lock or not
    * @throws IOException read exceptions
-   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public void delete(Delete delete, Integer lockid, boolean writeToWAL)
   throws IOException {
@@ -1855,7 +1784,7 @@ public class HRegion implements HeapSize
     byte [] byteNow = Bytes.toBytes(now);
     boolean flush = false;
 
-    lock(updatesLock.readLock());
+    updatesLock.readLock().lock();
     try {
       prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
 
@@ -1914,7 +1843,6 @@ public class HRegion implements HeapSize
    * @param put
    * @param lockid
    * @throws IOException
-   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public void put(Put put, Integer lockid) throws IOException {
     this.put(put, lockid, put.getWriteToWAL());
@@ -1927,7 +1855,6 @@ public class HRegion implements HeapSize
    * @param lockid
    * @param writeToWAL
    * @throws IOException
-   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public void put(Put put, Integer lockid, boolean writeToWAL)
   throws IOException {
@@ -2013,7 +1940,7 @@ public class HRegion implements HeapSize
     System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
     return batchMutate(mutationsAndLocks);
   }
-
+  
   /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
@@ -2236,7 +2163,7 @@ public class HRegion implements HeapSize
         }
       }
 
-      lock(this.updatesLock.readLock(), numReadyToWrite);
+      this.updatesLock.readLock().lock();
       locked = true;
 
       //
@@ -2367,7 +2294,7 @@ public class HRegion implements HeapSize
 
       // do after lock
       final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
-
+            
       // See if the column families were consistent through the whole thing.
       // if they were then keep them. If they were not then pass a null.
       // null will be treated as unknown.
@@ -2401,24 +2328,6 @@ public class HRegion implements HeapSize
   //the getting of the lock happens before, so that you would just pass it into
   //the methods. So in the case of checkAndMutate you could just do lockRow,
   //get, put, unlockRow or something
- /**
-  *
-  * @param row
-  * @param family
-  * @param qualifier
-  * @param compareOp
-  * @param comparator
-  * @param writeToWAL
-  * @throws IOException
-  * @return true if the new put was execute, false otherwise
-  */
- public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
-     CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
-     boolean writeToWAL)
- throws IOException {
-   return checkAndMutate(row, family, qualifier, compareOp, comparator, w, null, writeToWAL);
- }
-  
   /**
    *
    * @param row
@@ -2430,7 +2339,6 @@ public class HRegion implements HeapSize
    * @param writeToWAL
    * @throws IOException
    * @return true if the new put was execute, false otherwise
-   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
       CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
@@ -2549,8 +2457,7 @@ public class HRegion implements HeapSize
    * this and the synchronize on 'this' inside in internalFlushCache to send
    * the notify.
    */
-  private void checkResources()
-      throws RegionTooBusyException, InterruptedIOException {
+  private void checkResources() {
 
     // If catalog region, do not impose resource constraints or block updates.
     if (this.getRegionInfo().isMetaRegion()) return;
@@ -2568,30 +2475,12 @@ public class HRegion implements HeapSize
           " is >= than blocking " +
           StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
       }
-      long now = EnvironmentEdgeManager.currentTimeMillis();
-      long timeToWait = startTime + busyWaitDuration - now;
-      if (timeToWait <= 0L) {
-        final long totalTime = now - startTime;
-        this.updatesBlockedMs.add(totalTime);
-        LOG.info("Failed to unblock updates for region " + this + " '"
-          + Thread.currentThread().getName() + "' in " + totalTime
-          + "ms. The region is still busy.");
-        throw new RegionTooBusyException("region is flushing");
-      }
       blocked = true;
       synchronized(this) {
         try {
-          wait(Math.min(timeToWait, threadWakeFrequency));
-        } catch (InterruptedException ie) {
-          final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
-          if (totalTime > 0) {
-            this.updatesBlockedMs.add(totalTime);
-          }
-          LOG.info("Interrupted while waiting to unblock updates for region "
-            + this + " '" + Thread.currentThread().getName() + "'");
-          InterruptedIOException iie = new InterruptedIOException();
-          iie.initCause(ie);
-          throw iie;
+          wait(threadWakeFrequency);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
         }
       }
     }
@@ -2658,7 +2547,7 @@ public class HRegion implements HeapSize
     byte[] byteNow = Bytes.toBytes(now);
     boolean flush = false;
 
-    lock(this.updatesLock.readLock());
+    this.updatesLock.readLock().lock();
     try {
       checkFamilies(familyMap.keySet());
       checkTimestamps(familyMap, now);
@@ -2689,7 +2578,7 @@ public class HRegion implements HeapSize
     // do after lock
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-
+    
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -3283,7 +3172,6 @@ public class HRegion implements HeapSize
    * @param lockId  The lock ID to release.
    */
   public void releaseRowLock(final Integer lockId) {
-    if (lockId == null) return; // null lock id, do nothing
     HashedBytes rowKey = lockIds.remove(lockId);
     if (rowKey == null) {
       LOG.warn("Release unknown lockId: " + lockId);
@@ -3524,10 +3412,6 @@ public class HRegion implements HeapSize
       this(scan, null);
     }
 
-    @Override
-    public long getMvccReadPoint() {
-      return this.readPt;
-    }
     /**
      * Reset both the filter and the old filter.
      */
@@ -3538,7 +3422,7 @@ public class HRegion implements HeapSize
     }
 
     @Override
-    public boolean next(List<KeyValue> outResults, int limit)
+    public synchronized boolean next(List<KeyValue> outResults, int limit)
         throws IOException {
       return next(outResults, limit, null);
     }
@@ -3558,42 +3442,30 @@ public class HRegion implements HeapSize
         // This could be a new thread from the last time we called next().
         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
 
-        return nextRaw(outResults, limit, metric);
-      } finally {
-        closeRegionOperation();
-      }
-    }
+        results.clear();
 
-    @Override
-    public boolean nextRaw(List<KeyValue> outResults, String metric)
-        throws IOException {
-      return nextRaw(outResults, batch, metric);
-    }
-
-    @Override
-    public boolean nextRaw(List<KeyValue> outResults, int limit,
-        String metric) throws IOException {
-      results.clear();
-
-      boolean returnResult = nextInternal(limit, metric);
+        boolean returnResult = nextInternal(limit, metric);
 
-      outResults.addAll(results);
-      resetFilters();
-      if (isFilterDone()) {
-        return false;
+        outResults.addAll(results);
+        resetFilters();
+        if (isFilterDone()) {
+          return false;
+        }
+        return returnResult;
+      } finally {
+        closeRegionOperation();
       }
-      return returnResult;
     }
 
     @Override
-    public boolean next(List<KeyValue> outResults)
+    public synchronized boolean next(List<KeyValue> outResults)
         throws IOException {
       // apply the batching limit by default
       return next(outResults, batch, null);
     }
 
     @Override
-    public boolean next(List<KeyValue> outResults, String metric)
+    public synchronized boolean next(List<KeyValue> outResults, String metric)
         throws IOException {
       // apply the batching limit by default
       return next(outResults, batch, metric);
@@ -3617,16 +3489,8 @@ public class HRegion implements HeapSize
           rpcCall.throwExceptionIfCallerDisconnected();
         }
 
-        KeyValue current = this.storeHeap.peek();
-        byte[] currentRow = null;
-        int offset = 0;
-        short length = 0;
-        if (current != null) {
-          currentRow = current.getBuffer();
-          offset = current.getRowOffset();
-          length = current.getRowLength();
-        }
-        if (isStopRow(currentRow, offset, length)) {
+        byte [] currentRow = peekRow();
+        if (isStopRow(currentRow)) {
           if (filter != null && filter.hasFilterRow()) {
             filter.filterRow(results);
           }
@@ -3635,10 +3499,10 @@ public class HRegion implements HeapSize
           }
 
           return false;
-        } else if (filterRowKey(currentRow, offset, length)) {
-          nextRow(currentRow, offset, length);
+        } else if (filterRowKey(currentRow)) {
+          nextRow(currentRow);
         } else {
-          KeyValue nextKv;
+          byte [] nextRow;
           do {
             this.storeHeap.next(results, limit - results.size(), metric);
             if (limit > 0 && results.size() == limit) {
@@ -3648,10 +3512,9 @@ public class HRegion implements HeapSize
               }
               return true; // we are expecting more yes, but also limited to how many we can return.
             }
-            nextKv = this.storeHeap.peek();
-          } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+          } while (Bytes.equals(currentRow, nextRow = peekRow()));
 
-          final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+          final boolean stopRow = isStopRow(nextRow);
 
           // now that we have an entire row, lets process with a filters:
 
@@ -3666,7 +3529,7 @@ public class HRegion implements HeapSize
             // the reasons for calling this method are:
             // 1. reset the filters.
             // 2. provide a hook to fast forward the row (used by subclasses)
-            nextRow(currentRow, offset, length);
+            nextRow(currentRow);
 
             // This row was totally filtered out, if this is NOT the last row,
             // we should continue on.
@@ -3682,25 +3545,29 @@ public class HRegion implements HeapSize
       return filter != null
           && filter.filterRow();
     }
-    private boolean filterRowKey(byte[] row, int offset, short length) {
+    private boolean filterRowKey(byte[] row) {
       return filter != null
-          && filter.filterRowKey(row, offset, length);
+          && filter.filterRowKey(row, 0, row.length);
     }
 
-    protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
-      KeyValue next;
-      while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
-        this.storeHeap.next(MOCKED_LIST);       
+    protected void nextRow(byte [] currentRow) throws IOException {
+      while (Bytes.equals(currentRow, peekRow())) {
+        this.storeHeap.next(MOCKED_LIST);
       }
       results.clear();
       resetFilters();
     }
 
-    private boolean isStopRow(byte [] currentRow, int offset, short length) {
+    private byte[] peekRow() {
+      KeyValue kv = this.storeHeap.peek();
+      return kv == null ? null : kv.getRow();
+    }
+
+    private boolean isStopRow(byte [] currentRow) {
       return currentRow == null ||
           (stopRow != null &&
           comparator.compareRows(stopRow, 0, stopRow.length,
-              currentRow, offset, length) <= isScan);
+              currentRow, 0, currentRow.length) <= isScan);
     }
 
     @Override
@@ -3828,7 +3695,6 @@ public class HRegion implements HeapSize
    * @param conf
    * @param hTableDescriptor
    * @param hlog shared HLog
-   * @param boolean initialize - true to initialize the region
    * @return new HRegion
    *
    * @throws IOException
@@ -3836,36 +3702,7 @@ public class HRegion implements HeapSize
   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
                                       final Configuration conf,
                                       final HTableDescriptor hTableDescriptor,
-                                      final HLog hlog,
-                                      final boolean initialize)
-      throws IOException {
-    return createHRegion(info, rootDir, conf, hTableDescriptor,
-        hlog, initialize, false);
-  }
-
-  /**
-   * Convenience method creating new HRegions. Used by createTable.
-   * The {@link HLog} for the created region needs to be closed
-   * explicitly, if it is not null.
-   * Use {@link HRegion#getLog()} to get access.
-   *
-   * @param info Info for region to create.
-   * @param rootDir Root directory for HBase instance
-   * @param conf
-   * @param hTableDescriptor
-   * @param hlog shared HLog
-   * @param boolean initialize - true to initialize the region
-   * @param boolean ignoreHLog
-      - true to skip generate new hlog if it is null, mostly for createTable
-   * @return new HRegion
-   *
-   * @throws IOException
-   */
-  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
-                                      final Configuration conf,
-                                      final HTableDescriptor hTableDescriptor,
-                                      final HLog hlog,
-                                      final boolean initialize, final boolean ignoreHLog)
+                                      final HLog hlog)
       throws IOException {
     LOG.info("creating HRegion " + info.getTableNameAsString()
         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@@ -3877,26 +3714,16 @@ public class HRegion implements HeapSize
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
     HLog effectiveHLog = hlog;
-    if (hlog == null && !ignoreHLog) {
+    if (hlog == null) {
       effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
           new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
     }
     HRegion region = HRegion.newHRegion(tableDir,
         effectiveHLog, fs, conf, info, hTableDescriptor, null);
-    if (initialize) {
-      region.initialize();
-    }
+    region.initialize();
     return region;
   }
 
-  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
-                                      final Configuration conf,
-                                      final HTableDescriptor hTableDescriptor,
-                                      final HLog hlog)
-    throws IOException {
-    return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
-  }
-
   /**
    * Open a Region.
    * @param info Info for region to be opened.
@@ -4351,19 +4178,9 @@ public class HRegion implements HeapSize
   //
   /**
    * @param get get object
-   * @return result
-   * @throws IOException read exceptions
-   */
-  public Result get(final Get get) throws IOException {
-    return get(get, null);
-  }
-
-  /**
-   * @param get get object
    * @param lockid existing lock id, or null for no previous lock
    * @return result
    * @throws IOException read exceptions
-   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public Result get(final Get get, final Integer lockid) throws IOException {
     checkRow(get.getRow(), "Get");
@@ -4418,7 +4235,7 @@ public class HRegion implements HeapSize
     // do after lock
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updateGetMetrics(get.familySet(), after - now);
-
+    
     return results;
   }
 
@@ -4486,7 +4303,7 @@ public class HRegion implements HeapSize
       }
 
       // 3. acquire the region lock
-      lock(this.updatesLock.readLock(), acquiredLocks.size());
+      this.updatesLock.readLock().lock();
       locked = true;
 
       // 4. Get a mvcc write number
@@ -4608,23 +4425,6 @@ public class HRegion implements HeapSize
 
   // TODO: There's a lot of boiler plate code identical
   // to increment... See how to better unify that.
-
-  /**
-  *
-  * Perform one or more append operations on a row.
-  * <p>
-  * Appends performed are done under row lock but reads do not take locks out
-  * so this can be seen partially complete by gets and scans.
-  *
-  * @param append
-  * @param writeToWAL
-  * @return new keyvalues after increment
-  * @throws IOException
-  */
- public Result append(Append append, boolean writeToWAL)
-     throws IOException {
-   return append(append, null, writeToWAL);
- }
   /**
    *
    * Perform one or more append operations on a row.
@@ -4637,7 +4437,6 @@ public class HRegion implements HeapSize
    * @param writeToWAL
    * @return new keyvalues after increment
    * @throws IOException
-   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public Result append(Append append, Integer lockid, boolean writeToWAL)
       throws IOException {
@@ -4657,7 +4456,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     try {
       Integer lid = getLock(lockid, row, true);
-      lock(this.updatesLock.readLock());
+      this.updatesLock.readLock().lock();
       try {
         long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
@@ -4764,10 +4563,10 @@ public class HRegion implements HeapSize
       closeRegionOperation();
     }
 
-
+    
     long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-
+    
     if (flush) {
       // Request a cache flush. Do it outside update lock.
       requestFlush();
@@ -4777,22 +4576,6 @@ public class HRegion implements HeapSize
   }
 
   /**
-  *
-  * Perform one or more increment operations on a row.
-  * <p>
-  * Increments performed are done under row lock but reads do not take locks
-  * out so this can be seen partially complete by gets and scans.
-  * @param increment
-  * @param writeToWAL
-  * @return new keyvalues after increment
-  * @throws IOException
-  */
-  public Result increment(Increment increment, boolean writeToWAL)
-  throws IOException {
-    return increment(increment, null, writeToWAL);
-  }
-
-  /**
    *
    * Perform one or more increment operations on a row.
    * <p>
@@ -4803,8 +4586,6 @@ public class HRegion implements HeapSize
    * @param writeToWAL
    * @return new keyvalues after increment
    * @throws IOException
-   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
-
    */
   public Result increment(Increment increment, Integer lockid,
       boolean writeToWAL)
@@ -4826,7 +4607,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     try {
       Integer lid = getLock(lockid, row, true);
-      lock(this.updatesLock.readLock());
+      this.updatesLock.readLock().lock();
       try {
         long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
@@ -4910,7 +4691,7 @@ public class HRegion implements HeapSize
       long after = EnvironmentEdgeManager.currentTimeMillis();
       this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
     }
-
+    
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -4944,7 +4725,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     try {
       Integer lid = obtainRowLock(row);
-      lock(this.updatesLock.readLock());
+      this.updatesLock.readLock().lock();
       try {
         Store store = stores.get(family);
 
@@ -5037,8 +4818,8 @@ public class HRegion implements HeapSize
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      35 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (7 * Bytes.SIZEOF_LONG) +
+      35 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
+      (5 * Bytes.SIZEOF_LONG) +
       Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@@ -5338,16 +5119,13 @@ public class HRegion implements HeapSize
    * #closeRegionOperation needs to be called in the try's finally block
    * Acquires a read lock and checks if the region is closing or closed.
    * @throws NotServingRegionException when the region is closing or closed
-   * @throws RegionTooBusyException if failed to get the lock in time
-   * @throws InterruptedIOException if interrupted while waiting for a lock
    */
-  public void startRegionOperation()
-      throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
+  private void startRegionOperation() throws NotServingRegionException {
     if (this.closing.get()) {
       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
           " is closing");
     }
-    lock(lock.readLock());
+    lock.readLock().lock();
     if (this.closed.get()) {
       lock.readLock().unlock();
       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
@@ -5359,7 +5137,7 @@ public class HRegion implements HeapSize
    * Closes the lock. This needs to be called in the finally block corresponding
    * to the try block of #startRegionOperation
    */
-  public void closeRegionOperation(){
+  private void closeRegionOperation(){
     lock.readLock().unlock();
   }
 
@@ -5369,17 +5147,15 @@ public class HRegion implements HeapSize
    * #closeBulkRegionOperation needs to be called in the try's finally block
    * Acquires a writelock and checks if the region is closing or closed.
    * @throws NotServingRegionException when the region is closing or closed
-   * @throws RegionTooBusyException if failed to get the lock in time
-   * @throws InterruptedIOException if interrupted while waiting for a lock
    */
   private void startBulkRegionOperation(boolean writeLockNeeded)
-      throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
+  throws NotServingRegionException {
     if (this.closing.get()) {
       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
           " is closing");
     }
-    if (writeLockNeeded) lock(lock.writeLock());
-    else lock(lock.readLock());
+    if (writeLockNeeded) lock.writeLock().lock();
+    else lock.readLock().lock();
     if (this.closed.get()) {
       if (writeLockNeeded) lock.writeLock().unlock();
       else lock.readLock().unlock();
@@ -5392,7 +5168,7 @@ public class HRegion implements HeapSize
    * Closes the lock. This needs to be called in the finally block corresponding
    * to the try block of #startRegionOperation
    */
-  private void closeBulkRegionOperation() {
+  private void closeBulkRegionOperation(){
     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
     else lock.readLock().unlock();
   }
@@ -5403,7 +5179,7 @@ public class HRegion implements HeapSize
    */
   private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
     if (numPutsWithoutWAL.getAndIncrement() == 0) {
-      LOG.info("writing data to region " + this +
+      LOG.info("writing data to region " + this + 
                " with WAL disabled. Data may be lost in the event of a crash.");
     }
 
@@ -5417,33 +5193,6 @@ public class HRegion implements HeapSize
     dataInMemoryWithoutWAL.addAndGet(putSize);
   }
 
-  private void lock(final Lock lock)
-      throws RegionTooBusyException, InterruptedIOException {
-    lock(lock, 1);
-  }
-
-  /**
-   * Try to acquire a lock.  Throw RegionTooBusyException
-   * if failed to get the lock in time. Throw InterruptedIOException
-   * if interrupted while waiting for the lock.
-   */
-  private void lock(final Lock lock, final int multiplier)
-      throws RegionTooBusyException, InterruptedIOException {
-    try {
-      final long waitTime = Math.min(maxBusyWaitDuration,
-        busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
-      if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
-        throw new RegionTooBusyException(
-          "failed to get a lock in " + waitTime + "ms");
-      }
-    } catch (InterruptedException ie) {
-      LOG.info("Interrupted while waiting for a lock");
-      InterruptedIOException iie = new InterruptedIOException();
-      iie.initCause(ie);
-      throw iie;
-    }
-  }
-
   /**
    * Calls sync with the given transaction ID if the region's table is not
    * deferring it.
@@ -5483,6 +5232,7 @@ public class HRegion implements HeapSize
     }
   };
 
+
   /**
    * Facility for dumping and compacting catalog tables.
    * Only does catalog tables since these are only tables we for sure know
@@ -5515,11 +5265,11 @@ public class HRegion implements HeapSize
     final HLog log = new HLog(fs, logdir, oldLogDir, c);
     try {
       processTable(fs, tableDir, log, c, majorCompact);
-    } finally {
+     } finally {
        log.close();
        // TODO: is this still right?
        BlockCache bc = new CacheConfig(c).getBlockCache();
        if (bc != null) bc.shutdown();
-    }
+     }
   }
 }

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Dec 23 19:34:53 2012
@@ -44,7 +44,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -97,7 +96,6 @@ import org.apache.hadoop.hbase.client.Mu
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowLock;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Exec;
@@ -167,7 +165,6 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.joda.time.field.MillisDurationField;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
@@ -236,7 +233,7 @@ public class HRegionServer implements HR
   // Server to handle client requests. Default access so can be accessed by
   // unit tests.
   RpcServer rpcServer;
-
+  
   // Server to handle client requests.
   private HBaseServer server;  
 
@@ -366,8 +363,6 @@ public class HRegionServer implements HR
    */
   private ClusterId clusterId = null;
 
-  private RegionServerCoprocessorHost rsHost;
-
   /**
    * Starts a HRegionServer at the default location
    *
@@ -439,10 +434,6 @@ public class HRegionServer implements HR
     this.rpcServer.setQosFunction(new QosFunction());
     this.startcode = System.currentTimeMillis();
 
-    // login the zookeeper client principal (if using security)
-    ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
-      "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
-
     // login the server principal (if using secure Hadoop)
     User.login(this.conf, "hbase.regionserver.keytab.file",
       "hbase.regionserver.kerberos.principal", this.isa.getHostName());
@@ -1022,7 +1013,6 @@ public class HRegionServer implements HR
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
       this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
-      this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
       startServiceThreads();
       LOG.info("Serving as " + this.serverNameFromMasterPOV +
         ", RPC listening on " + this.isa +
@@ -1030,7 +1020,6 @@ public class HRegionServer implements HR
         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
       isOnline = true;
     } catch (Throwable e) {
-      LOG.warn("Exception in region server : ", e);
       this.isOnline = false;
       stop("Failed initialization");
       throw convertThrowableToIOE(cleanup(e, "Failed init"),
@@ -1106,7 +1095,8 @@ public class HRegionServer implements HR
         storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
         totalStaticIndexSizeKB, totalStaticBloomSizeKB,
         (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
-        totalCompactingKVs, currentCompactedKVs);
+        totalCompactingKVs, currentCompactedKVs,
+        r.getCoprocessorHost().getCoprocessors());
   }
 
   /**
@@ -1586,7 +1576,6 @@ public class HRegionServer implements HR
     this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
         this.getConfiguration(), this.getServerName().toString());
     splitLogWorker.start();
-    
   }
 
   /**
@@ -1654,15 +1643,10 @@ public class HRegionServer implements HR
 
   @Override
   public void stop(final String msg) {
-    try {
-      this.rsHost.preStop(msg);
-      this.stopped = true;
-      LOG.info("STOPPED: " + msg);
-      // Wakes run() if it is sleeping
-      sleeper.skipSleepCycle();
-    } catch (IOException exp) {
-      LOG.warn("The region server did not stop", exp);
-    }
+    this.stopped = true;
+    LOG.info("STOPPED: " + msg);
+    // Wakes run() if it is sleeping
+    sleeper.skipSleepCycle();
   }
 
   public void waitForServerOnline(){
@@ -2446,32 +2430,23 @@ public class HRegionServer implements HR
         }
       }
 
-      MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
-      region.startRegionOperation();
-      try {
-        int i = 0;
-        synchronized(s) {
-          for (; i < nbRows
-              && currentScanResultSize < maxScannerResultSize; i++) {
-            // Collect values to be returned here
-            boolean moreRows = s.nextRaw(values, SchemaMetrics.METRIC_NEXTSIZE);
-            if (!values.isEmpty()) {
-              for (KeyValue kv : values) {
-                currentScanResultSize += kv.heapSize();
-              }
-              results.add(new Result(values));
-            }
-            if (!moreRows) {
-              break;
-            }
-            values.clear();
+      for (int i = 0; i < nbRows
+          && currentScanResultSize < maxScannerResultSize; i++) {
+        requestCount.incrementAndGet();
+        // Collect values to be returned here
+        boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
+        if (!values.isEmpty()) {
+          for (KeyValue kv : values) {
+            currentScanResultSize += kv.heapSize();
           }
+          results.add(new Result(values));
         }
-        requestCount.addAndGet(i);
-        region.readRequestsCount.add(i);
-      } finally {
-        region.closeRegionOperation();
+        if (!moreRows) {
+          break;
+        }
+        values.clear();
       }
+
       // coprocessor postNext hook
       if (region != null && region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
@@ -2614,9 +2589,6 @@ public class HRegionServer implements HR
     return -1;
   }
 
-  /**
-   * @deprecated {@link RowLock} and associated operations are deprecated.
-   */
   public long lockRow(byte[] regionName, byte[] row) throws IOException {
     checkOpen();
     NullPointerException npe = null;
@@ -2633,9 +2605,6 @@ public class HRegionServer implements HR
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
-      if (region.getCoprocessorHost() != null) {
-        region.getCoprocessorHost().preLockRow(regionName, row);
-      }
       Integer r = region.obtainRowLock(row);
       long lockId = addRowLock(r, region);
       LOG.debug("Row lock " + lockId + " explicitly acquired by client");
@@ -2679,9 +2648,6 @@ public class HRegionServer implements HR
     return rl;
   }
 
-  /**
-   * @deprecated {@link RowLock} and associated operations are deprecated.
-   */
   @Override
   @QosPriority(priority=HConstants.HIGH_QOS)
   public void unlockRow(byte[] regionName, long lockId) throws IOException {
@@ -2700,9 +2666,6 @@ public class HRegionServer implements HR
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
-      if (region.getCoprocessorHost() != null) {
-        region.getCoprocessorHost().preUnLockRow(regionName, lockId);
-      }
       String lockName = String.valueOf(lockId);
       Integer r = rowlocks.remove(lockName);
       if (r == null) {
@@ -2879,11 +2842,6 @@ public class HRegionServer implements HR
     final int versionOfClosingNode)
   throws IOException {
     checkOpen();
-    //Check for permissions to close.
-    HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
-    if (actualRegion.getCoprocessorHost() != null) {
-      actualRegion.getCoprocessorHost().preClose(false);
-    }
     LOG.info("Received close region: " + region.getRegionNameAsString() +
       ". Version of ZK closing node:" + versionOfClosingNode);
     boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
@@ -2931,17 +2889,6 @@ public class HRegionServer implements HR
    */
   protected boolean closeRegion(HRegionInfo region, final boolean abort,
       final boolean zk, final int versionOfClosingNode) {
-    
-    HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
-    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() !=null)){
-      try {
-        actualRegion.getCoprocessorHost().preClose(abort);
-      } catch (IOException e) {
-        LOG.warn(e);
-        return false;
-      }
-    }
-    
     if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
       LOG.warn("Received close for region we are already opening or closing; " +
           region.getEncodedName());
@@ -3642,10 +3589,6 @@ public class HRegionServer implements HR
     return this.zooKeeper;
   }
 
-  public RegionServerCoprocessorHost getCoprocessorHost(){
-    return this.rsHost;
-  }
-
 
   public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
     return this.regionsInTransitionInRS;
@@ -3823,13 +3766,8 @@ public class HRegionServer implements HR
 
   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
   public String[] getCoprocessors() {
-    TreeSet<String> coprocessors = new TreeSet<String>(
-        this.hlog.getCoprocessorHost().getCoprocessors());
-    Collection<HRegion> regions = getOnlineRegionsLocalContext();
-    for (HRegion region: regions) {
-      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
-    }
-    return coprocessors.toArray(new String[0]);
+    HServerLoad hsl = buildServerLoad();
+    return hsl == null? null: hsl.getCoprocessors();
   }
 
   /**

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sun Dec 23 19:34:53 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -264,7 +265,7 @@ public class RegionCoprocessorHost
   /**
    * Invoked before a region open
    */
-  public void preOpen(){
+  public void preOpen() {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
@@ -284,7 +285,7 @@ public class RegionCoprocessorHost
   /**
    * Invoked after a region open
    */
-  public void postOpen(){
+  public void postOpen() {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
@@ -305,7 +306,7 @@ public class RegionCoprocessorHost
    * Invoked before a region is closed
    * @param abortRequested true if the server is aborting
    */
-  public void preClose(boolean abortRequested) throws IOException {
+  public void preClose(boolean abortRequested) {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
@@ -313,7 +314,7 @@ public class RegionCoprocessorHost
         try {
           ((RegionObserver)env.getInstance()).preClose(ctx, abortRequested);
         } catch (Throwable e) {
-          handleCoprocessorThrowable(env, e);
+          handleCoprocessorThrowableNoRethrow(env, e);
         }
       }
     }
@@ -323,7 +324,7 @@ public class RegionCoprocessorHost
    * Invoked after a region is closed
    * @param abortRequested true if the server is aborting
    */
-  public void postClose(boolean abortRequested){
+  public void postClose(boolean abortRequested) {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
@@ -1482,31 +1483,5 @@ public class RegionCoprocessorHost
 
     return hasLoaded;
   }
-  
-  public void preLockRow(byte[] regionName, byte[] row) throws IOException {
-    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
-    for (RegionEnvironment env : coprocessors) {
-      if (env.getInstance() instanceof RegionObserver) {
-        ctx = ObserverContext.createAndPrepare(env, ctx);
-        ((RegionObserver) env.getInstance()).preLockRow(ctx, regionName, row);
-        if (ctx.shouldComplete()) {
-          break;
-        }
-      }
-    }
-  }
-
-  public void preUnLockRow(byte[] regionName, long lockId) throws IOException {
-    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
-    for (RegionEnvironment env : coprocessors) {
-      if (env.getInstance() instanceof RegionObserver) {
-        ctx = ObserverContext.createAndPrepare(env, ctx);
-        ((RegionObserver) env.getInstance()).preUnlockRow(ctx, regionName, lockId);
-        if (ctx.shouldComplete()) {
-          break;
-        }
-      }
-    }
-  }
 
 }

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Sun Dec 23 19:34:53 2012
@@ -20,10 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.List;
-
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
 
 /**
  * RegionScanner describes iterators over rows in an HRegion.
@@ -52,50 +49,4 @@ public interface RegionScanner extends I
    */
   public boolean reseek(byte[] row) throws IOException;
 
-  /**
-   * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
-   */
-  public long getMvccReadPoint();
-
-  /**
-   * Grab the next row's worth of values with the default limit on the number of values
-   * to return.
-   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
-   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
-   * See {@link #nextRaw(List, int, String)}
-   * @param result return output array
-   * @param metric the metric name
-   * @return true if more rows exist after this one, false if scanner is done
-   * @throws IOException e
-   */
-  public boolean nextRaw(List<KeyValue> result, String metric) throws IOException;
-
-  /**
-   * Grab the next row's worth of values with a limit on the number of values
-   * to return.
-   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
-   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
-   * Example:
-   * <code><pre>
-   * HRegion region = ...;
-   * RegionScanner scanner = ...
-   * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
-   * region.startRegionOperation();
-   * try {
-   *   synchronized(scanner) {
-   *     ...
-   *     boolean moreRows = scanner.nextRaw(values);
-   *     ...
-   *   }
-   * } finally {
-   *   region.closeRegionOperation();
-   * }
-   * </pre></code>
-   * @param result return output array
-   * @param limit limit on row count to get
-   * @param metric the metric name
-   * @return true if more rows exist after this one, false if scanner is done
-   * @throws IOException e
-   */
-  public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
 }

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Sun Dec 23 19:34:53 2012
@@ -82,8 +82,6 @@ public class ScanQueryMatcher {
   /* row is not private for tests */
   /** Row the query is on */
   byte [] row;
-  int rowOffset;
-  short rowLength;
   
   /**
    * Oldest put in any of the involved store files
@@ -224,7 +222,7 @@ public class ScanQueryMatcher {
     short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
     offset += Bytes.SIZEOF_SHORT;
 
-    int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
+    int ret = this.rowComparator.compareRows(row, 0, row.length,
         bytes, offset, rowLength);
     if (ret <= -1) {
       return MatchCode.DONE;
@@ -387,10 +385,8 @@ public class ScanQueryMatcher {
    * Set current row
    * @param row
    */
-  public void setRow(byte [] row, int offset, short length) {
+  public void setRow(byte [] row) {
     this.row = row;
-    this.rowOffset = offset;
-    this.rowLength = length;
     reset();
   }
 

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Dec 23 19:34:53 2012
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -131,6 +132,9 @@ public class Store extends SchemaConfigu
   private volatile long totalUncompressedBytes = 0L;
   private final Object flushLock = new Object();
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final String storeNameStr;
+  private CompactionProgress progress;
+  private final int compactionKVMax;
   private final boolean verifyBulkLoads;
 
   /* The default priority for user-specified compaction requests.
@@ -154,6 +158,10 @@ public class Store extends SchemaConfigu
     new CopyOnWriteArraySet<ChangedReadersObserver>();
 
   private final int blocksize;
+  /** Compression algorithm for flush files and minor compaction */
+  private final Compression.Algorithm compression;
+  /** Compression algorithm for major compaction */
+  private final Compression.Algorithm compactionCompression;
   private HFileDataBlockEncoder dataBlockEncoder;
 
   /** Checksum configuration */
@@ -163,8 +171,6 @@ public class Store extends SchemaConfigu
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
 
-  private final Compactor compactor;
-
   /**
    * Constructor
    * @param basedir qualified path under which the region directory lives;
@@ -179,16 +185,25 @@ public class Store extends SchemaConfigu
   protected Store(Path basedir, HRegion region, HColumnDescriptor family,
     FileSystem fs, Configuration conf)
   throws IOException {
-    super(conf, region.getRegionInfo().getTableNameAsString(),
+    super(conf, region.getTableDesc().getNameAsString(),
         Bytes.toString(family.getName()));
-    HRegionInfo info = region.getRegionInfo();
+    HRegionInfo info = region.regionInfo;
     this.fs = fs;
-    Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
-    this.homedir = createStoreHomeDir(this.fs, p);
+    this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+    if (!this.fs.exists(this.homedir)) {
+      if (!this.fs.mkdirs(this.homedir))
+        throw new IOException("Failed create of: " + this.homedir.toString());
+    }
     this.region = region;
     this.family = family;
     this.conf = conf;
     this.blocksize = family.getBlocksize();
+    this.compression = family.getCompression();
+    // avoid overriding compression setting for major compactions if the user
+    // has not specified it separately
+    this.compactionCompression =
+      (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
+        family.getCompactionCompression() : this.compression;
 
     this.dataBlockEncoder =
         new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
@@ -213,6 +228,7 @@ public class Store extends SchemaConfigu
         "ms in store " + this);
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
+    this.storeNameStr = getColumnFamilyName();
 
     // By default, compact if storefile.count >= minFilesToCompact
     this.minFilesToCompact = Math.max(2,
@@ -229,8 +245,10 @@ public class Store extends SchemaConfigu
       this.region.memstoreFlushSize);
     this.maxCompactSize
       = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
+    this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
 
-    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
+    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
+        false);
 
     if (Store.closeCheckInterval == 0) {
       Store.closeCheckInterval = conf.getInt(
@@ -242,47 +260,6 @@ public class Store extends SchemaConfigu
     this.checksumType = getChecksumType(conf);
     // initilize bytes per checksum
     this.bytesPerChecksum = getBytesPerChecksum(conf);
-    // Create a compaction tool instance
-    this.compactor = new Compactor(this.conf);
-  }
-
-  /**
-   * @param family
-   * @return
-   */
-  long getTTL(final HColumnDescriptor family) {
-    // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
-    long ttl = family.getTimeToLive();
-    if (ttl == HConstants.FOREVER) {
-      // Default is unlimited ttl.
-      ttl = Long.MAX_VALUE;
-    } else if (ttl == -1) {
-      ttl = Long.MAX_VALUE;
-    } else {
-      // Second -> ms adjust for user data
-      ttl *= 1000;
-    }
-    return ttl;
-  }
-
-  /**
-   * Create this store's homedir
-   * @param fs
-   * @param homedir
-   * @return Return <code>homedir</code>
-   * @throws IOException
-   */
-  Path createStoreHomeDir(final FileSystem fs,
-      final Path homedir) throws IOException {
-    if (!fs.exists(homedir)) {
-      if (!fs.mkdirs(homedir))
-        throw new IOException("Failed create of: " + homedir.toString());
-    }
-    return homedir;
-  }
-
-  FileSystem getFileSystem() {
-    return this.fs;
   }
 
   /**
@@ -343,7 +320,7 @@ public class Store extends SchemaConfigu
    * Return the directory in which this store stores its
    * StoreFiles
    */
-  Path getHomedir() {
+  public Path getHomedir() {
     return homedir;
   }
 
@@ -362,10 +339,6 @@ public class Store extends SchemaConfigu
     this.dataBlockEncoder = blockEncoder;
   }
 
-  FileStatus [] getStoreFiles() throws IOException {
-    return FSUtils.listStatus(this.fs, this.homedir, null);
-  }
-
   /**
    * Creates an unsorted list of StoreFile loaded in parallel
    * from the given directory.
@@ -373,7 +346,7 @@ public class Store extends SchemaConfigu
    */
   private List<StoreFile> loadStoreFiles() throws IOException {
     ArrayList<StoreFile> results = new ArrayList<StoreFile>();
-    FileStatus files[] = getStoreFiles();
+    FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
 
     if (files == null || files.length == 0) {
       return results;
@@ -664,7 +637,7 @@ public class Store extends SchemaConfigu
           storeFileCloserThreadPool.shutdownNow();
         }
       }
-      LOG.info("Closed " + this);
+      LOG.debug("closed " + this.storeNameStr);
       return result;
     } finally {
       this.lock.writeLock().unlock();
@@ -750,7 +723,6 @@ public class Store extends SchemaConfigu
       scanner = cpScanner;
     }
     try {
-      int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
       // TODO:  We can fail in the below block before we complete adding this
       // flush to list of store files.  Add cleanup of anything put on filesystem
       // if we fail.
@@ -764,7 +736,7 @@ public class Store extends SchemaConfigu
           List<KeyValue> kvs = new ArrayList<KeyValue>();
           boolean hasMore;
           do {
-            hasMore = scanner.next(kvs, compactionKVMax);
+            hasMore = scanner.next(kvs, this.compactionKVMax);
             if (!kvs.isEmpty()) {
               for (KeyValue kv : kvs) {
                 // If we know that this KV is going to be included always, then let us
@@ -856,7 +828,7 @@ public class Store extends SchemaConfigu
    */
   private StoreFile.Writer createWriterInTmp(int maxKeyCount)
   throws IOException {
-    return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
+    return createWriterInTmp(maxKeyCount, this.compression, false);
   }
 
   /*
@@ -1009,12 +981,16 @@ public class Store extends SchemaConfigu
    * @param cr
    *          compaction details obtained from requestCompaction()
    * @throws IOException
-   * @return Storefile we compacted into or null if we failed or opted out early.
    */
-  StoreFile compact(CompactionRequest cr) throws IOException {
-    if (cr == null || cr.getFiles().isEmpty()) return null;
-    Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
+  void compact(CompactionRequest cr) throws IOException {
+    if (cr == null || cr.getFiles().isEmpty()) {
+      return;
+    }
+    Preconditions.checkArgument(cr.getStore().toString()
+        .equals(this.toString()));
+
     List<StoreFile> filesToCompact = cr.getFiles();
+
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
       // TODO: change this to LOG.error() after more debugging
@@ -1026,26 +1002,19 @@ public class Store extends SchemaConfigu
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-        + this + " of "
+        + this.storeNameStr + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
     StoreFile sf = null;
     try {
-      StoreFile.Writer writer =
-        this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
+      StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
+          maxId);
       // Move the compaction into place.
-      if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
-        sf = completeCompaction(filesToCompact, writer);
-        if (region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postCompact(this, sf);
-        }
-      } else {
-        // Create storefile around what we wrote with a reader on it.
-        sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
-          this.family.getBloomFilterType(), this.dataBlockEncoder);
-        sf.createReader();
+      sf = completeCompaction(filesToCompact, writer);
+      if (region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().postCompact(this, sf);
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1054,7 +1023,7 @@ public class Store extends SchemaConfigu
     }
 
     LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
-        + filesToCompact.size() + " file(s) in " + this + " of "
+        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into " +
         (sf == null ? "none" : sf.getPath().getName()) +
@@ -1062,7 +1031,6 @@ public class Store extends SchemaConfigu
           StringUtils.humanReadableInt(sf.getReader().length()))
         + "; total size for store is "
         + StringUtils.humanReadableInt(storeSize));
-    return sf;
   }
 
   /**
@@ -1102,8 +1070,7 @@ public class Store extends SchemaConfigu
 
     try {
       // Ready to go. Have list of files to compact.
-      StoreFile.Writer writer =
-        this.compactor.compact(this, filesToCompact, isMajor, maxId);
+      StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
       if (region.getCoprocessorHost() != null) {
@@ -1152,10 +1119,10 @@ public class Store extends SchemaConfigu
   }
 
   /** getter for CompactionProgress object
-   * @return CompactionProgress object; can be null
+   * @return CompactionProgress object
    */
   public CompactionProgress getCompactionProgress() {
-    return this.compactor.getProgress();
+    return this.progress;
   }
 
   /*
@@ -1207,19 +1174,19 @@ public class Store extends SchemaConfigu
         if (sf.isMajorCompaction() &&
             (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping major compaction of " + this +
+            LOG.debug("Skipping major compaction of " + this.storeNameStr +
                 " because one (major) compacted file only and oldestTime " +
                 oldest + "ms is < ttl=" + this.ttl);
           }
         } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
-          LOG.debug("Major compaction triggered on store " + this +
+          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
             ", because keyvalues outdated; time since last major compaction " +
             (now - lowTimestamp) + "ms");
           result = true;
         }
       } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this +
+          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
               "; time since last major compaction " + (now - lowTimestamp) + "ms");
         }
         result = true;
@@ -1409,12 +1376,12 @@ public class Store extends SchemaConfigu
              compactSelection.getFilesToCompact().get(pos).getReader().length()
                > maxCompactSize &&
              !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
-      if (pos != 0) compactSelection.clearSubList(0, pos);
+      compactSelection.clearSubList(0, pos);
     }
 
     if (compactSelection.getFilesToCompact().isEmpty()) {
       LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
-        this + ": no store files to compact");
+        this.storeNameStr + ": no store files to compact");
       compactSelection.emptyFileList();
       return compactSelection;
     }
@@ -1501,7 +1468,7 @@ public class Store extends SchemaConfigu
       // if we don't have enough files to compact, just wait
       if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipped compaction of " + this
+          LOG.debug("Skipped compaction of " + this.storeNameStr
             + ".  Only " + (end - start) + " file(s) of size "
             + StringUtils.humanReadableInt(totalSize)
             + " have met compaction criteria.");
@@ -1528,6 +1495,149 @@ public class Store extends SchemaConfigu
   }
 
   /**
+   * Do a minor/major compaction on an explicit set of storefiles in a Store.
+   * Uses the scan infrastructure to make it easy.
+   *
+   * @param filesToCompact which files to compact
+   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+   * @param maxId Readers maximum sequence id.
+   * @return Product of compaction or null if all cells expired or deleted and
+   * nothing made it through the compaction.
+   * @throws IOException
+   */
+  StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
+                               final boolean majorCompaction, final long maxId)
+      throws IOException {
+    // calculate maximum key count after compaction (for blooms)
+    int maxKeyCount = 0;
+    long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+    for (StoreFile file : filesToCompact) {
+      StoreFile.Reader r = file.getReader();
+      if (r != null) {
+        // NOTE: getFilterEntries could cause under-sized blooms if the user
+        //       switches bloom type (e.g. from ROW to ROWCOL)
+        long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
+          ? r.getFilterEntries() : r.getEntries();
+        maxKeyCount += keyCount;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Compacting " + file +
+            ", keycount=" + keyCount +
+            ", bloomtype=" + r.getBloomFilterType().toString() +
+            ", size=" + StringUtils.humanReadableInt(r.length()) +
+            ", encoding=" + r.getHFileReader().getEncodingOnDisk());
+        }
+      }
+      // For major compactions calculate the earliest put timestamp
+      // of all involved storefiles. This is used to remove 
+      // family delete marker during the compaction.
+      if (majorCompaction) {
+        byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+        if (tmp == null) {
+          // there's a file with no information, must be an old one
+          // assume we have very old puts
+          earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+        } else {
+          earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+        }
+      }
+    }
+
+    // keep track of compaction progress
+    progress = new CompactionProgress(maxKeyCount);
+
+    // For each file, obtain a scanner:
+    List<StoreFileScanner> scanners = StoreFileScanner
+      .getScannersForStoreFiles(filesToCompact, false, false, true);
+
+    // Make the instantiation lazy in case compaction produces no product; i.e.
+    // where all source cells are expired or deleted.
+    StoreFile.Writer writer = null;
+    // Find the smallest read point across all the Scanners.
+    long smallestReadPoint = region.getSmallestReadPoint();
+    MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+    try {
+      InternalScanner scanner = null;
+      try {
+        if (getHRegion().getCoprocessorHost() != null) {
+          scanner = getHRegion()
+              .getCoprocessorHost()
+              .preCompactScannerOpen(this, scanners,
+                  majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+        }
+        if (scanner == null) {
+          Scan scan = new Scan();
+          scan.setMaxVersions(getFamily().getMaxVersions());
+          /* Include deletes, unless we are doing a major compaction */
+          scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
+            majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+            smallestReadPoint, earliestPutTs);
+        }
+        if (getHRegion().getCoprocessorHost() != null) {
+          InternalScanner cpScanner =
+            getHRegion().getCoprocessorHost().preCompact(this, scanner);
+          // NULL scanner returned from coprocessor hooks means skip normal processing
+          if (cpScanner == null) {
+            return null;
+          }
+          scanner = cpScanner;
+        }
+
+        int bytesWritten = 0;
+        // since scanner.next() can return 'false' but still be delivering data,
+        // we have to use a do/while loop.
+        ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+        // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+        boolean hasMore;
+        do {
+          hasMore = scanner.next(kvs, this.compactionKVMax);
+          if (writer == null && !kvs.isEmpty()) {
+            writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
+                true);
+          }
+          if (writer != null) {
+            // output to writer:
+            for (KeyValue kv : kvs) {
+              if (kv.getMemstoreTS() <= smallestReadPoint) {
+                kv.setMemstoreTS(0);
+              }
+              writer.append(kv);
+              // update progress per key
+              ++progress.currentCompactedKVs;
+
+              // check periodically to see if a system stop is requested
+              if (Store.closeCheckInterval > 0) {
+                bytesWritten += kv.getLength();
+                if (bytesWritten > Store.closeCheckInterval) {
+                  bytesWritten = 0;
+                  if (!this.region.areWritesEnabled()) {
+                    writer.close();
+                    fs.delete(writer.getPath(), false);
+                    throw new InterruptedIOException(
+                        "Aborting compaction of store " + this +
+                        " in region " + this.region +
+                        " because user requested stop.");
+                  }
+                }
+              }
+            }
+          }
+          kvs.clear();
+        } while (hasMore);
+      } finally {
+        if (scanner != null) {
+          scanner.close();
+        }
+      }
+    } finally {
+      if (writer != null) {
+        writer.appendMetadata(maxId, majorCompaction);
+        writer.close();
+      }
+    }
+    return writer;
+  }
+
+  /**
    * Validates a store file by opening and closing it. In HFileV2 this should
    * not be an expensive operation.
    *
@@ -1631,7 +1741,7 @@ public class Store extends SchemaConfigu
 
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
-      LOG.error("Failed replacing compacted files in " + this +
+      LOG.error("Failed replacing compacted files in " + this.storeNameStr +
         ". Compacted file is " + (result == null? "none": result.toString()) +
         ".  Files replaced " + compactedFiles.toString() +
         " some of which may have been already removed", e);
@@ -1917,7 +2027,7 @@ public class Store extends SchemaConfigu
         return mk.getRow();
       }
     } catch(IOException e) {
-      LOG.warn("Failed getting store size for " + this, e);
+      LOG.warn("Failed getting store size for " + this.storeNameStr, e);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -1970,7 +2080,7 @@ public class Store extends SchemaConfigu
 
   @Override
   public String toString() {
-    return getColumnFamilyName();
+    return this.storeNameStr;
   }
 
   /**
@@ -2086,7 +2196,7 @@ public class Store extends SchemaConfigu
   }
 
   HRegionInfo getHRegionInfo() {
-    return this.region.getRegionInfo();
+    return this.region.regionInfo;
   }
 
   /**
@@ -2214,8 +2324,8 @@ public class Store extends SchemaConfigu
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
-          + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+          + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+          + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sun Dec 23 19:34:53 2012
@@ -320,7 +320,7 @@ public class StoreFile extends SchemaCon
    * @return Calculated path to parent region file.
    * @throws IOException
    */
-  public static Path getReferredToFile(final Path p) {
+  static Path getReferredToFile(final Path p) {
     Matcher m = REF_NAME_PARSER.matcher(p.getName());
     if (m == null || !m.matches()) {
       LOG.warn("Failed match of store file name " + p.toString());

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sun Dec 23 19:34:53 2012
@@ -340,11 +340,8 @@ public class StoreScanner extends NonLaz
 
     // only call setRow if the row changes; avoids confusing the query matcher
     // if scanning intra-row
-    byte[] row = peeked.getBuffer();
-    int offset = peeked.getRowOffset();
-    short length = peeked.getRowLength();
-    if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
-      matcher.setRow(row, offset, length);
+    if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
+      matcher.setRow(peeked.getRow());
     }
 
     KeyValue kv;
@@ -524,12 +521,9 @@ public class StoreScanner extends NonLaz
     if (kv == null) {
       kv = lastTopKey;
     }
-    byte[] row = kv.getBuffer();
-    int offset = kv.getRowOffset();
-    short length = kv.getRowLength();
-    if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
+    if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
       matcher.reset();
-      matcher.setRow(row, offset, length);
+      matcher.setRow(kv.getRow());
     }
   }
 

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java Sun Dec 23 19:34:53 2012
@@ -49,4 +49,5 @@ public class CompactionProgress {
   public float getProgressPct() {
     return currentCompactedKVs / totalCompactingKVs;
   }
+
 }

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Sun Dec 23 19:34:53 2012
@@ -143,12 +143,7 @@ public class Compressor {
       // the status byte also acts as the higher order byte of the dictionary
       // entry
       short dictIdx = toShort(status, in.readByte());
-      byte[] entry;
-      try {
-        entry = dict.getEntry(dictIdx);
-      } catch (Exception ex) {
-        throw new IOException("Unable to uncompress the log entry", ex);
-      }
+      byte[] entry = dict.getEntry(dictIdx);
       if (entry == null) {
         throw new IOException("Missing dictionary entry for index "
             + dictIdx);

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Sun Dec 23 19:34:53 2012
@@ -167,7 +167,6 @@ public class HLog implements Syncable {
     Entry next(Entry reuse) throws IOException;
     void seek(long pos) throws IOException;
     long getPosition() throws IOException;
-    void reset() throws IOException;
   }
 
   public interface Writer {
@@ -696,18 +695,15 @@ public class HLog implements Syncable {
 
   /**
    * Get a reader for the WAL.
-   * The proper way to tail a log that can be under construction is to first use this method
-   * to get a reader then call {@link HLog.Reader#reset()} to see the new data. It will also
-   * take care of keeping implementation-specific context (like compression).
    * @param fs
    * @param path
    * @param conf
    * @return A WAL reader.  Close when done with it.
    * @throws IOException
    */
-  public static Reader getReader(final FileSystem fs, final Path path,
-                                 Configuration conf)
-      throws IOException {
+  public static Reader getReader(final FileSystem fs,
+    final Path path, Configuration conf)
+  throws IOException {
     try {
 
       if (logReaderClass == null) {

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1425513&r1=1425512&r2=1425513&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Sun Dec 23 19:34:53 2012
@@ -139,17 +139,15 @@ public class SequenceFileLogReader imple
 
   Configuration conf;
   WALReader reader;
-  FileSystem fs;
 
   // Needed logging exceptions
   Path path;
   int edit = 0;
   long entryStart = 0;
-  boolean emptyCompressionContext = true;
   /**
    * Compression context to use reading.  Can be null if no compression.
    */
-  protected CompressionContext compressionContext = null;
+  private CompressionContext compressionContext = null;
 
   protected Class<? extends HLogKey> keyClass;
 
@@ -175,7 +173,6 @@ public class SequenceFileLogReader imple
     this.conf = conf;
     this.path = path;
     reader = new WALReader(fs, path, conf);
-    this.fs = fs;
 
     // If compression is enabled, new dictionaries are created here.
     boolean compression = reader.isWALCompressionEnabled();
@@ -240,22 +237,11 @@ public class SequenceFileLogReader imple
       throw addFileInfoToException(ioe);
     }
     edit++;
-    if (compressionContext != null && emptyCompressionContext) {
-      emptyCompressionContext = false;
-    }
     return b? e: null;
   }
 
   @Override
   public void seek(long pos) throws IOException {
-    if (compressionContext != null && emptyCompressionContext) {
-      while (next() != null) {
-        if (getPosition() == pos) {
-          emptyCompressionContext = false;
-          break;
-        }
-      }
-    }
     try {
       reader.seek(pos);
     } catch (IOException ioe) {
@@ -300,11 +286,4 @@ public class SequenceFileLogReader imple
 
     return ioe;
   }
-
-  @Override
-  public void reset() throws IOException {
-    // Resetting the reader lets us see newly added data if the file is being written to
-    // We also keep the same compressionContext which was previously populated for this file
-    reader = new WALReader(fs, path, conf);
-  }
 }
\ No newline at end of file



Mime
View raw message