hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1446147 [14/35] - in /hbase/branches/hbase-7290v2: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/...
Date Thu, 14 Feb 2013 12:58:21 GMT
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Feb 14 12:58:12 2013
@@ -24,8 +24,6 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.text.ParseException;
 import java.util.AbstractList;
 import java.util.ArrayList;
@@ -72,6 +70,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.FailedSanityCheckException;
@@ -96,11 +95,8 @@ import org.apache.hadoop.hbase.client.Mu
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowLock;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Exec;
-import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -111,10 +107,9 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.UnknownProtocolException;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -141,11 +136,9 @@ import org.apache.hadoop.util.StringUtil
 import org.cliffc.high_scale_lib.Counter;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ClassToInstanceMap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.MutableClassToInstanceMap;
 import com.google.common.io.Closeables;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
@@ -194,6 +187,9 @@ public class HRegion implements HeapSize
   public static final Log LOG = LogFactory.getLog(HRegion.class);
   private static final String MERGEDIR = ".merges";
 
+  public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
+      "hbase.hregion.scan.loadColumnFamiliesOnDemand";
+
   final AtomicBoolean closed = new AtomicBoolean(false);
   /* Closing can take some time; use the closing flag if there is stuff we don't
    * want to do while in closing state; e.g. like offer this region up to the
@@ -218,13 +214,6 @@ public class HRegion implements HeapSize
   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
       Bytes.BYTES_RAWCOMPARATOR);
 
-  // Registered region protocol handlers
-  private ClassToInstanceMap<CoprocessorProtocol>
-      protocolHandlers = MutableClassToInstanceMap.create();
-
-  private Map<String, Class<? extends CoprocessorProtocol>>
-      protocolHandlerNames = Maps.newHashMap();
-
   // TODO: account for each registered handler in HeapSize computation
   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
 
@@ -298,6 +287,17 @@ public class HRegion implements HeapSize
   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
 
   /**
+   * The sequence ID that was encountered when this region was opened.
+   */
+  private long openSeqNum = HConstants.NO_SEQNUM;
+
+  /**
+   * The default setting for whether to enable on-demand CF loading for
+   * scan requests to this region. Requests can override it.
+   */
+  private boolean isLoadingCfsOnDemandDefault = false;
+
+  /**
    * @return The smallest mvcc readPoint across all the scanners in this
    * region. Writes older than this readPoint, are included  in every
    * read operation.
@@ -394,35 +394,6 @@ public class HRegion implements HeapSize
   private final MetricsRegionWrapperImpl metricsRegionWrapper;
 
   /**
-   * Should only be used for testing purposes
-   */
-  public HRegion(){
-    this.tableDir = null;
-    this.blockingMemStoreSize = 0L;
-    this.conf = null;
-    this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
-    this.rsServices = null;
-    this.baseConf = null;
-    this.fs = null;
-    this.timestampSlop = HConstants.LATEST_TIMESTAMP;
-    this.rowProcessorTimeout = DEFAULT_ROW_PROCESSOR_TIMEOUT;
-    this.memstoreFlushSize = 0L;
-    this.log = null;
-    this.regiondir = null;
-    this.regionInfo = null;
-    this.htableDescriptor = null;
-    this.threadWakeFrequency = 0L;
-    this.coprocessorHost = null;
-    this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
-
-    this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
-    this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
-    this.maxBusyWaitDuration = 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
-    this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
-    this.maxBusyWaitMultiplier = 2;
-  }
-
-  /**
    * HRegion copy constructor. Useful when reopening a closed region (normally
    * for unit tests)
    * @param other original object
@@ -458,6 +429,9 @@ public class HRegion implements HeapSize
   public HRegion(Path tableDir, HLog log, FileSystem fs,
       Configuration confParam, final HRegionInfo regionInfo,
       final HTableDescriptor htd, RegionServerServices rsServices) {
+    if (htd == null) {
+      throw new IllegalArgumentException("Need table descriptor");
+    }
     this.tableDir = tableDir;
     this.comparator = regionInfo.getComparator();
     this.log = log;
@@ -469,9 +443,12 @@ public class HRegion implements HeapSize
     this.baseConf = confParam;
     this.conf = new CompoundConfiguration()
       .add(confParam)
-      .add(htd.getValues());
+      .addStringMap(htd.getConfiguration())
+      .addWritableMap(htd.getValues());
     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
                     DEFAULT_ROWLOCK_WAIT_DURATION);
+
+    this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, false);
     this.regionInfo = regionInfo;
     this.htableDescriptor = htd;
     this.rsServices = rsServices;
@@ -531,9 +508,9 @@ public class HRegion implements HeapSize
     if (this.htableDescriptor == null) return;
     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
 
-    if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
+    if (flushSize <= 0) {
       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
-         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+        HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
     }
     this.memstoreFlushSize = flushSize;
     this.blockingMemStoreSize = this.memstoreFlushSize *
@@ -600,8 +577,7 @@ public class HRegion implements HeapSize
     // initialized to -1 so that we pick up MemstoreTS from column families
     long maxMemstoreTS = -1;
 
-    if (this.htableDescriptor != null &&
-        !htableDescriptor.getFamilies().isEmpty()) {
+    if (!htableDescriptor.getFamilies().isEmpty()) {
       // initialize the thread pool for opening stores in parallel.
       ThreadPoolExecutor storeOpenerThreadPool =
         getStoreOpenAndCloseThreadPool(
@@ -657,9 +633,7 @@ public class HRegion implements HeapSize
     // being split but we crashed in the middle of it all.
     SplitTransaction.cleanupAnySplitDetritus(this);
     FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
-    if (this.htableDescriptor != null) {
-      this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
-    }
+    this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
 
     this.writestate.flushRequested = false;
     this.writestate.compacting = 0;
@@ -944,7 +918,7 @@ public class HRegion implements HeapSize
     return isAvailable() && !hasReferences();
   }
 
-  boolean areWritesEnabled() {
+  public boolean areWritesEnabled() {
     synchronized(this.writestate) {
       return this.writestate.writesEnabled;
     }
@@ -954,6 +928,10 @@ public class HRegion implements HeapSize
      return mvcc;
    }
 
+   public boolean isLoadingCfsOnDemandDefault() {
+     return this.isLoadingCfsOnDemandDefault;
+   }
+
   /**
    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
    * service any more calls.
@@ -1282,6 +1260,10 @@ public class HRegion implements HeapSize
     return getTmpDir(getRegionDir());
   }
 
+  /**
+   * Get the temporary directory for the specified region. This directory
+   * will have its contents removed when the region is reopened.
+   */
   static Path getTmpDir(Path regionDir) {
     return new Path(regionDir, REGION_TEMP_SUBDIR);
   }
@@ -1575,17 +1557,26 @@ public class HRegion implements HeapSize
     long flushsize = this.memstoreSize.get();
     status.setStatus("Preparing to flush by snapshotting stores");
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
-    long completeSeqId = -1L;
+    long flushSeqId = -1L;
     try {
       // Record the mvcc for all transactions in progress.
       w = mvcc.beginMemstoreInsert();
       mvcc.advanceMemstore(w);
 
-      sequenceId = (wal == null)? myseqid:
-        wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
-      completeSeqId = this.getCompleteCacheFlushSequenceId(sequenceId);
+      if (wal != null) {
+        Long startSeqId = wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
+        if (startSeqId == null) {
+          status.setStatus("Flush will not be started for [" + this.regionInfo.getEncodedName()
+              + "] - WAL is going away");
+          return false;
+        }
+        flushSeqId = startSeqId.longValue();
+      } else {
+        flushSeqId = myseqid;
+      }
+
       for (Store s : stores.values()) {
-        storeFlushers.add(s.getStoreFlusher(completeSeqId));
+        storeFlushers.add(s.getStoreFlusher(flushSeqId));
       }
 
       // prepare flush (take a snapshot)
@@ -1654,22 +1645,14 @@ public class HRegion implements HeapSize
       throw dse;
     }
 
-    // If we get to here, the HStores have been written. If we get an
-    // error in completeCacheFlush it will release the lock it is holding
-
-    // B.  Write a FLUSHCACHE-COMPLETE message to the log.
-    //     This tells future readers that the HStores were emitted correctly,
-    //     and that all updates to the log for this regionName that have lower
-    //     log-sequence-ids can be safely ignored.
+    // If we get to here, the HStores have been written.
     if (wal != null) {
-      wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
-        regionInfo.getTableName(), completeSeqId,
-        this.getRegionInfo().isMetaRegion());
+      wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes());
     }
 
     // Update the last flushed sequence id for region
     if (this.rsServices != null) {
-      completeSequenceId = completeSeqId;
+      completeSequenceId = flushSeqId;
     }
 
     // C. Finally notify anyone waiting on memstore to clear:
@@ -1694,18 +1677,6 @@ public class HRegion implements HeapSize
     return compactionRequested;
   }
 
-   /**
-   * Get the sequence number to be associated with this cache flush. Used by
-   * TransactionalRegion to not complete pending transactions.
-   *
-   *
-   * @param currentSequenceId
-   * @return sequence id to complete the cache flush with
-   */
-  protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
-    return currentSequenceId;
-  }
-
   //////////////////////////////////////////////////////////////////////////////
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
@@ -1754,7 +1725,7 @@ public class HRegion implements HeapSize
       if (key != null) {
         Get get = new Get(key.getRow());
         get.addFamily(family);
-        result = get(get, null);
+        result = get(get);
       }
       if (coprocessorHost != null) {
         coprocessorHost.postGetClosestRowBefore(row, family, result);
@@ -1807,7 +1778,7 @@ public class HRegion implements HeapSize
 
   protected RegionScanner instantiateRegionScanner(Scan scan,
       List<KeyValueScanner> additionalScanners) throws IOException {
-    return new RegionScannerImpl(scan, additionalScanners);
+    return new RegionScannerImpl(scan, additionalScanners, this);
   }
 
   /*
@@ -1835,28 +1806,19 @@ public class HRegion implements HeapSize
   //////////////////////////////////////////////////////////////////////////////
   /**
    * @param delete delete object
-   * @param lockid existing lock id, or null for grab a lock
    * @param writeToWAL append to the write ahead lock or not
    * @throws IOException read exceptions
    */
-  public void delete(Delete delete, Integer lockid, boolean writeToWAL)
+  public void delete(Delete delete, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
     checkResources();
-    Integer lid = null;
     startRegionOperation();
     this.writeRequestsCount.increment();
     try {
       byte [] row = delete.getRow();
-      // If we did not pass an existing row lock, obtain a new one
-      lid = getLock(lockid, row, true);
-
-      try {
-        // All edits for the given row (across all column families) must happen atomically.
-        doBatchMutate(delete, lid);
-      } finally {
-        if(lockid == null) releaseRowLock(lid);
-      }
+      // All edits for the given row (across all column families) must happen atomically.
+      doBatchMutate(delete, null);
     } finally {
       closeRegionOperation();
     }
@@ -1881,7 +1843,7 @@ public class HRegion implements HeapSize
    * Setup correct timestamps in the KVs in Delete object.
    * Caller should have the row and region locks.
    * @param familyMap
-   * @param now
+   * @param byteNow
    * @throws IOException
    */
   void prepareDeleteTimestamps(Map<byte[], List<KeyValue>> familyMap, byte[] byteNow)
@@ -1936,36 +1898,15 @@ public class HRegion implements HeapSize
    * @throws IOException
    */
   public void put(Put put) throws IOException {
-    this.put(put, null, put.getWriteToWAL());
-  }
-
-  /**
-   * @param put
-   * @param writeToWAL
-   * @throws IOException
-   */
-  public void put(Put put, boolean writeToWAL) throws IOException {
-    this.put(put, null, writeToWAL);
-  }
-
-  /**
-   * @param put
-   * @param lockid
-   * @throws IOException
-   */
-  public void put(Put put, Integer lockid) throws IOException {
-    this.put(put, lockid, put.getWriteToWAL());
+    this.put(put, put.getWriteToWAL());
   }
 
-
-
   /**
    * @param put
-   * @param lockid
    * @param writeToWAL
    * @throws IOException
    */
-  public void put(Put put, Integer lockid, boolean writeToWAL)
+  public void put(Put put, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
 
@@ -1983,15 +1924,9 @@ public class HRegion implements HeapSize
       // See HRegionServer#RegionListener for how the expire on HRegionServer
       // invokes a HRegion#abort.
       byte [] row = put.getRow();
-      // If we did not pass an existing row lock, obtain a new one
-      Integer lid = getLock(lockid, row, true);
 
-      try {
-        // All edits for the given row (across all column families) must happen atomically.
-        doBatchMutate(put, lid);
-      } finally {
-        if(lockid == null) releaseRowLock(lid);
-      }
+      // All edits for the given row (across all column families) must happen atomically.
+      doBatchMutate(put, null);
     } finally {
       closeRegionOperation();
     }
@@ -2022,6 +1957,10 @@ public class HRegion implements HeapSize
 
   /**
    * Perform a batch put with no pre-specified locks
+<<<<<<< HEAD
+=======
+   * @see HRegion#batchMutate(Pair[])
+>>>>>>> apache/trunk
    */
   public OperationStatus[] put(Put[] puts) throws IOException {
     @SuppressWarnings("unchecked")
@@ -2124,10 +2063,6 @@ public class HRegion implements HeapSize
     Set<byte[]> deletesCfSet = null;
 
     WALEdit walEdit = new WALEdit();
-
-    long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
-
-
     MultiVersionConsistencyControl.WriteEntry w = null;
     long txid = 0;
     boolean walSyncSuccessful = false;
@@ -2396,7 +2331,6 @@ public class HRegion implements HeapSize
 
       if (noOfPuts > 0) {
         // There were some Puts in the batch.
-        double noOfMutations = noOfPuts + noOfDeletes;
         if (this.metricsRegion != null) {
           this.metricsRegion.updatePut();
         }
@@ -2429,14 +2363,14 @@ public class HRegion implements HeapSize
    * @param qualifier
    * @param compareOp
    * @param comparator
-   * @param lockId
+   * @param w
    * @param writeToWAL
    * @throws IOException
    * @return true if the new put was executed, false otherwise
    */
   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
-      Integer lockId, boolean writeToWAL)
+      boolean writeToWAL)
   throws IOException{
     checkReadOnly();
     //TODO, add check for value length or maybe even better move this to the
@@ -2452,13 +2386,12 @@ public class HRegion implements HeapSize
 
     startRegionOperation();
     try {
-      RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
-      Get get = new Get(row, lock);
+      Get get = new Get(row);
       checkFamily(family);
       get.addColumn(family, qualifier);
 
       // Lock row
-      Integer lid = getLock(lockId, get.getRow(), true);
+      Integer lid = getLock(null, get.getRow(), true);
       // wait for all previous transactions to complete (with lock held)
       mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
       List<KeyValue> result = null;
@@ -2511,7 +2444,7 @@ public class HRegion implements HeapSize
         this.checkAndMutateChecksFailed.increment();
         return false;
       } finally {
-        if(lockId == null) releaseRowLock(lid);
+        releaseRowLock(lid);
       }
     } finally {
       closeRegionOperation();
@@ -2690,7 +2623,7 @@ public class HRegion implements HeapSize
    * @praram now
    * @throws IOException
    */
-  private void put(final byte [] row, byte [] family, List<KeyValue> edits, Integer lid)
+  private void put(final byte [] row, byte [] family, List<KeyValue> edits)
   throws IOException {
     Map<byte[], List<KeyValue>> familyMap;
     familyMap = new HashMap<byte[], List<KeyValue>>();
@@ -2700,9 +2633,9 @@ public class HRegion implements HeapSize
     p.setFamilyMap(familyMap);
     p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
     p.setWriteToWAL(true);
-    doBatchMutate(p, lid);
+    doBatchMutate(p, null);
   }
- 
+
   /**
    * Atomically apply the given map of family->edits to the memstore.
    * This handles the consistency control on its own, but the caller
@@ -2896,7 +2829,7 @@ public class HRegion implements HeapSize
       }
     }
     long seqid = minSeqIdForTheRegion;
-    
+
     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
     if (files == null || files.isEmpty()) return seqid;
 
@@ -3219,13 +3152,8 @@ public class HRegion implements HeapSize
    * <pre>
    *   LOCKS ==> ROWS
    * </pre>
-   *
-   * But it acts as a guard on the client; a miswritten client just can't
-   * submit the name of a row and start writing to it; it must know the correct
-   * lockid, which matches the lock list in memory.
-   *
-   * <p>It would be more memory-efficient to assume a correctly-written client,
-   * which maybe we'll do in the future.
+   * <p>It would be more memory-efficient to just have one mapping;
+   * maybe we'll do that in the future.
    *
    * @param row Name of row to lock.
    * @throws IOException
@@ -3248,7 +3176,7 @@ public class HRegion implements HeapSize
    *        null if unavailable.
    */
   private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
-      throws IOException {
+  throws IOException {
     checkRow(row, "row lock");
     startRegionOperation();
     try {
@@ -3272,7 +3200,10 @@ public class HRegion implements HeapSize
                   + Bytes.toStringBinary(row));
             }
           } catch (InterruptedException ie) {
-            // Empty
+            LOG.warn("internalObtainRowLock interrupted for row=" + Bytes.toStringBinary(row));
+            InterruptedIOException iie = new InterruptedIOException();
+            iie.initCause(ie);
+            throw iie;
           }
         }
       }
@@ -3294,16 +3225,6 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * Used by unit tests.
-   * @param lockid
-   * @return Row that goes with <code>lockid</code>
-   */
-  byte[] getRowFromLock(final Integer lockid) {
-    HashedBytes rowKey = lockIds.get(lockid);
-    return rowKey == null ? null : rowKey.getBytes();
-  }
-
-  /**
    * Release the row lock!
    * @param lockId  The lock ID to release.
    */
@@ -3377,17 +3298,25 @@ public class HRegion implements HeapSize
     return multipleFamilies;
   }
 
+
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+                                boolean assignSeqId) throws IOException {
+    return bulkLoadHFiles(familyPaths, assignSeqId, null);
+  }
+
   /**
    * Attempts to atomically load a group of hfiles.  This is critical for loading
    * rows with multiple column families atomically.
    *
    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+   * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
+   * file about to be bulk loaded
    * @param assignSeqId
    * @return true if successful, false if failed recoverably
    * @throws IOException if failed unrecoverably.
    */
-  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
-      boolean assignSeqId) throws IOException {
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
+      BulkLoadListener bulkLoadListener) throws IOException {
     Preconditions.checkNotNull(familyPaths);
     // we need writeLock for multi-family bulk load
     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
@@ -3447,7 +3376,14 @@ public class HRegion implements HeapSize
         String path = p.getSecond();
         Store store = getStore(familyName);
         try {
-          store.bulkLoadHFile(path, assignSeqId ? this.log.obtainSeqNum() : -1);
+          String finalPath = path;
+          if(bulkLoadListener != null) {
+            finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
+          }
+          store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1);
+          if(bulkLoadListener != null) {
+            bulkLoadListener.doneBulkLoad(familyName, path);
+          }
         } catch (IOException ioe) {
           // A failure here can cause an atomicity violation that we currently
           // cannot recover from since it is likely a failed HDFS operation.
@@ -3455,6 +3391,14 @@ public class HRegion implements HeapSize
           // TODO Need a better story for reverting partial failures due to HDFS.
           LOG.error("There was a partial failure due to IO when attempting to" +
               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
+          if(bulkLoadListener != null) {
+            try {
+              bulkLoadListener.failedBulkLoad(familyName, path);
+            } catch (Exception ex) {
+              LOG.error("Error while calling failedBulkLoad for family "+
+                  Bytes.toString(familyName)+" with path "+path, ex);
+            }
+          }
           throw ioe;
         }
       }
@@ -3493,30 +3437,41 @@ public class HRegion implements HeapSize
   class RegionScannerImpl implements RegionScanner {
     // Package local for testability
     KeyValueHeap storeHeap = null;
+    /** Heap of key-values that are not essential for the provided filters and are thus read
+     * on demand, if on-demand column family loading is enabled.*/
+    KeyValueHeap joinedHeap = null;
+    /**
+     * If the joined heap data gathering is interrupted due to scan limits, this will
+     * contain the row for which we are populating the values.*/
+    private KeyValue joinedContinuationRow = null;
+    // KeyValue indicating that limit is reached when scanning
+    private final KeyValue KV_LIMIT = new KeyValue();
     private final byte [] stopRow;
     private Filter filter;
-    private List<KeyValue> results = new ArrayList<KeyValue>();
     private int batch;
     private int isScan;
     private boolean filterClosed = false;
     private long readPt;
     private long maxResultSize;
+    private HRegion region;
 
     public HRegionInfo getRegionInfo() {
       return regionInfo;
     }
-    RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
-      //DebugPrint.println("HRegionScanner.<init>");
-
+    
+    RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
+        throws IOException {
+      // DebugPrint.println("HRegionScanner.<init>");
+      this.region = region;
       this.maxResultSize = scan.getMaxResultSize();
       if (scan.hasFilter()) {
         this.filter = new FilterWrapper(scan.getFilter());
       } else {
         this.filter = null;
       }
-      
+
       this.batch = scan.getBatch();
-      if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
+      if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
         this.stopRow = null;
       } else {
         this.stopRow = scan.getStopRow();
@@ -3539,7 +3494,10 @@ public class HRegion implements HeapSize
         scannerReadPoints.put(this, this.readPt);
       }
 
+      // Here we separate all scanners into two lists - scanner that provide data required
+      // by the filter to operate (scanners list) and all others (joinedScanners list).
       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+      List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
       if (additionalScanners != null) {
         scanners.addAll(additionalScanners);
       }
@@ -3548,13 +3506,21 @@ public class HRegion implements HeapSize
           scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
         KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
-        scanners.add(scanner);
+        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
+          || this.filter.isFamilyEssential(entry.getKey())) {
+          scanners.add(scanner);
+        } else {
+          joinedScanners.add(scanner);
+        }
       }
       this.storeHeap = new KeyValueHeap(scanners, comparator);
+      if (!joinedScanners.isEmpty()) {
+        this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
+      }
     }
 
-    RegionScannerImpl(Scan scan) throws IOException {
-      this(scan, null);
+    RegionScannerImpl(Scan scan, HRegion region) throws IOException {
+      this(scan, null, region);
     }
 
     @Override
@@ -3611,11 +3577,16 @@ public class HRegion implements HeapSize
     @Override
     public boolean nextRaw(List<KeyValue> outResults, int limit,
         String metric) throws IOException {
-      results.clear();
-
-      boolean returnResult = nextInternal(limit, metric);
-
-      outResults.addAll(results);
+      boolean returnResult;
+      if (outResults.isEmpty()) {
+        // Usually outResults is empty. This is true when next is called
+        // to handle scan or get operation.
+        returnResult = nextInternal(outResults, limit, metric);
+      } else {
+        List<KeyValue> tmpList = new ArrayList<KeyValue>();
+        returnResult = nextInternal(tmpList, limit, metric);
+        outResults.addAll(tmpList);
+      }
       resetFilters();
       if (isFilterDone()) {
         return false;
@@ -3637,6 +3608,46 @@ public class HRegion implements HeapSize
       return next(outResults, batch, metric);
     }
 
+    private void populateFromJoinedHeap(List<KeyValue> results, int limit, String metric) 
+        throws IOException {
+      assert joinedContinuationRow != null;
+      KeyValue kv = populateResult(results, this.joinedHeap, limit, 
+          joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(), 
+          joinedContinuationRow.getRowLength(), metric);
+      if (kv != KV_LIMIT) {
+        // We are done with this row, reset the continuation.
+        joinedContinuationRow = null;
+      }
+      // As the data is obtained from two independent heaps, we need to
+      // ensure that result list is sorted, because Result relies on that.
+      Collections.sort(results, comparator);
+    }
+
+    /**
+     * Fetches records with currentRow into results list, until next row or limit (if not -1).
+     * @param results
+     * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
+     * @param limit Max amount of KVs to place in result list, -1 means no limit.
+     * @param currentRow Byte array with key we are fetching.
+     * @param offset offset for currentRow
+     * @param length length for currentRow
+     * @param metric Metric key to be passed into KeyValueHeap::next().
+     * @return KV_LIMIT if limit reached, next KeyValue otherwise.
+     */
+    private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit, 
+        byte[] currentRow, int offset, short length, String metric) throws IOException {
+      KeyValue nextKv;
+      do {
+        heap.next(results, limit - results.size(), metric);
+        if (limit > 0 && results.size() == limit) {
+          return KV_LIMIT;
+        }
+        nextKv = heap.peek();
+      } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+
+      return nextKv;
+    }
+
     /*
      * @return True if a filter rules the scanner is over, done.
      */
@@ -3644,8 +3655,17 @@ public class HRegion implements HeapSize
       return this.filter != null && this.filter.filterAllRemaining();
     }
 
-    private boolean nextInternal(int limit, String metric) throws IOException {
+    private boolean nextInternal(List<KeyValue> results, int limit, String metric)
+    throws IOException {
+      if (!results.isEmpty()) {
+        throw new IllegalArgumentException("First parameter should be an empty list");
+      }
       RpcCallContext rpcCall = HBaseServer.getCurrentCall();
+      // The loop here is used only when at some point during the next we determine
+      // that due to effects of filters or otherwise, we have an empty row in the result.
+      // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
+      // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
+      // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
       while (true) {
         if (rpcCall != null) {
           // If a user specifies a too-restrictive or too-slow scanner, the
@@ -3655,7 +3675,9 @@ public class HRegion implements HeapSize
           rpcCall.throwExceptionIfCallerDisconnected();
         }
 
+        // Let's see what we have in the storeHeap.
         KeyValue current = this.storeHeap.peek();
+
         byte[] currentRow = null;
         int offset = 0;
         short length = 0;
@@ -3664,52 +3686,96 @@ public class HRegion implements HeapSize
           offset = current.getRowOffset();
           length = current.getRowLength();
         }
-        if (isStopRow(currentRow, offset, length)) {
-          if (filter != null && filter.hasFilterRow()) {
-            filter.filterRow(results);
-          }
-          
-          return false;
-        } else if (filterRowKey(currentRow, offset, length)) {
-          nextRow(currentRow, offset, length);
-        } else {
-          KeyValue nextKv;
-          do {
-            this.storeHeap.next(results, limit - results.size(), metric);
-            if (limit > 0 && results.size() == limit) {
-              if (this.filter != null && filter.hasFilterRow()) {
-                throw new IncompatibleFilterException(
-                  "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
-              }
-              return true; // we are expecting more yes, but also limited to how many we can return.
+        boolean stopRow = isStopRow(currentRow, offset, length);
+        // Check if we were getting data from the joinedHeap and hit the limit.
+        // If not, then it's main path - getting results from storeHeap.
+        if (joinedContinuationRow == null) {
+          // First, check if we are at a stop row. If so, there are no more results.
+          if (stopRow) {
+            if (filter != null && filter.hasFilterRow()) {
+              filter.filterRow(results);
             }
-            nextKv = this.storeHeap.peek();
-          } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+            return false;
+          }
+
+          // Check if rowkey filter wants to exclude this row. If so, loop to next.
+          // Technically, if we hit limits before on this row, we don't need this call.
+          if (filterRowKey(currentRow, offset, length)) {
+            boolean moreRows = nextRow(currentRow, offset, length);
+            if (!moreRows) return false;
+            results.clear();
+            continue;
+          }
 
-          final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+          KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
+              length, metric);
+          // Ok, we are good, let's try to get some results from the main heap.
+          if (nextKv == KV_LIMIT) {
+            if (this.filter != null && filter.hasFilterRow()) {
+              throw new IncompatibleFilterException(
+                "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
+            }
+            return true; // We hit the limit.
+          }
 
-          // now that we have an entire row, lets process with a filters:
+          stopRow = nextKv == null ||
+              isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+          // save that the row was empty before filters applied to it.
+          final boolean isEmptyRow = results.isEmpty();
 
-          // first filter with the filterRow(List)
+          // We have the part of the row necessary for filtering (all of it, usually).
+          // First filter with the filterRow(List).
           if (filter != null && filter.hasFilterRow()) {
             filter.filterRow(results);
           }
-
-          if (results.isEmpty()) {
-            // this seems like a redundant step - we already consumed the row
-            // there're no left overs.
-            // the reasons for calling this method are:
-            // 1. reset the filters.
-            // 2. provide a hook to fast forward the row (used by subclasses)
-            nextRow(currentRow, offset, length);
-
+          if (isEmptyRow) {
+            boolean moreRows = nextRow(currentRow, offset, length);
+            if (!moreRows) return false;
+            results.clear();
             // This row was totally filtered out, if this is NOT the last row,
-            // we should continue on.
-
+            // we should continue on. Otherwise, nothing else to do.
             if (!stopRow) continue;
+            return false;
           }
-          return !stopRow;
+
+          // Ok, we are done with storeHeap for this row.
+          // Now we may need to fetch additional, non-essential data into row.
+          // These values are not needed for filter to work, so we postpone their
+          // fetch to (possibly) reduce amount of data loads from disk.
+          if (this.joinedHeap != null) {
+            KeyValue nextJoinedKv = joinedHeap.peek();
+            // If joinedHeap is pointing to some other row, try to seek to a correct one.
+            // We don't need to recheck that row here - populateResult will take care of that.
+            boolean mayHaveData =
+              (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
+              || this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length));
+            if (mayHaveData) {
+              joinedContinuationRow = current;
+              populateFromJoinedHeap(results, limit, metric);
+            }
+          }
+        } else {
+          // Populating from the joined heap was stopped by limits, populate some more.
+          populateFromJoinedHeap(results, limit, metric);
+        }
+
+        // We may have just called populateFromJoinedMap and hit the limits. If that is
+        // the case, we need to call it again on the next next() invocation.
+        if (joinedContinuationRow != null) {
+          return true;
         }
+
+        // Finally, we are done with both joinedHeap and storeHeap.
+        // Double check to prevent empty rows from appearing in result. It could be
+        // the case when SingleColumnValueExcludeFilter is used.
+        if (results.isEmpty()) {
+          boolean moreRows = nextRow(currentRow, offset, length);
+          if (!moreRows) return false;
+          if (!stopRow) continue;
+        }
+
+        // We are done. Return the result.
+        return !stopRow;
       }
     }
 
@@ -3718,20 +3784,25 @@ public class HRegion implements HeapSize
           && filter.filterRowKey(row, offset, length);
     }
 
-    protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
+    protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
+      assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
       KeyValue next;
-      while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
-        this.storeHeap.next(MOCKED_LIST);       
+      while ((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
+        this.storeHeap.next(MOCKED_LIST);
       }
-      results.clear();
       resetFilters();
+      // Calling the hook in CP which allows it to do a fast forward
+      if (this.region.getCoprocessorHost() != null) {
+        return this.region.getCoprocessorHost().postScannerFilterRow(this, currentRow);
+      }
+      return true;
     }
 
     private boolean isStopRow(byte [] currentRow, int offset, short length) {
       return currentRow == null ||
           (stopRow != null &&
           comparator.compareRows(stopRow, 0, stopRow.length,
-              currentRow, offset, length) <= isScan);
+            currentRow, offset, length) <= isScan);
     }
 
     @Override
@@ -3740,6 +3811,10 @@ public class HRegion implements HeapSize
         storeHeap.close();
         storeHeap = null;
       }
+      if (joinedHeap != null) {
+        joinedHeap.close();
+        joinedHeap = null;
+      }
       // no need to sychronize here.
       scannerReadPoints.remove(this);
       this.filterClosed = true;
@@ -3754,16 +3829,21 @@ public class HRegion implements HeapSize
       if (row == null) {
         throw new IllegalArgumentException("Row cannot be null.");
       }
+      boolean result = false;
       startRegionOperation();
       try {
         // This could be a new thread from the last time we called next().
         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
         KeyValue kv = KeyValue.createFirstOnRow(row);
         // use request seek to make use of the lazy seek option. See HBASE-5520
-        return this.storeHeap.requestSeek(kv, true, true);
+        result = this.storeHeap.requestSeek(kv, true, true);
+        if (this.joinedHeap != null) {
+          result = this.joinedHeap.requestSeek(kv, true, true) || result;
+        }
       } finally {
         closeRegionOperation();
       }
+      return result;
     }
   }
 
@@ -3905,6 +3985,8 @@ public class HRegion implements HeapSize
     Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName());
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
+    // Write HRI to a file in case we need to recover .META.
+    writeRegioninfoOnFilesystem(info, regionDir, fs, conf);
     HLog effectiveHLog = hlog;
     if (hlog == null && !ignoreHLog) {
       effectiveHLog = HLogFactory.createHLog(fs, regionDir,
@@ -3985,15 +4067,15 @@ public class HRegion implements HeapSize
     return r.openHRegion(reporter);
   }
 
-  public static HRegion openHRegion(Path tableDir, final HRegionInfo info,
+  public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
       final HTableDescriptor htd, final HLog wal, final Configuration conf)
   throws IOException {
-    return openHRegion(tableDir, info, htd, wal, conf, null, null);
+    return openHRegion(rootDir, info, htd, wal, conf, null, null);
   }
 
   /**
    * Open a Region.
-   * @param tableDir Table directory
+   * @param rootDir Root directory for HBase instance
    * @param info Info for region to be opened.
    * @param wal HLog for region to use. This method will call
    * HLog#setSequenceNumber(long) passing the result of the call to
@@ -4005,7 +4087,7 @@ public class HRegion implements HeapSize
    *
    * @throws IOException
    */
-  public static HRegion openHRegion(final Path tableDir, final HRegionInfo info,
+  public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
       final HTableDescriptor htd, final HLog wal, final Configuration conf,
       final RegionServerServices rsServices,
       final CancelableProgressable reporter)
@@ -4015,7 +4097,7 @@ public class HRegion implements HeapSize
     if (LOG.isDebugEnabled()) {
       LOG.debug("Opening region: " + info);
     }
-    Path dir = HTableDescriptor.getTableDir(tableDir, info.getTableName());
+    Path dir = HTableDescriptor.getTableDir(rootDir, info.getTableName());
     HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info, htd, rsServices);
     return r.openHRegion(reporter);
   }
@@ -4032,10 +4114,11 @@ public class HRegion implements HeapSize
   throws IOException {
     checkCompressionCodecs();
 
-    long seqid = initialize(reporter);
+    this.openSeqNum = initialize(reporter);
     if (this.log != null) {
-      this.log.setSequenceNumber(seqid);
+      this.log.setSequenceNumber(this.openSeqNum);
     }
+
     return this;
   }
 
@@ -4061,21 +4144,16 @@ public class HRegion implements HeapSize
     meta.checkResources();
     // The row key is the region name
     byte[] row = r.getRegionName();
-    Integer lid = meta.obtainRowLock(row);
-    try {
-      final long now = EnvironmentEdgeManager.currentTimeMillis();
-      final List<KeyValue> edits = new ArrayList<KeyValue>(2);
-      edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
-        HConstants.REGIONINFO_QUALIFIER, now,
-        r.getRegionInfo().toByteArray()));
-      // Set into the root table the version of the meta table.
-      edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
-        HConstants.META_VERSION_QUALIFIER, now,
-        Bytes.toBytes(HConstants.META_VERSION)));
-      meta.put(row, HConstants.CATALOG_FAMILY, edits, lid);
-    } finally {
-      meta.releaseRowLock(lid);
-    }
+    final long now = EnvironmentEdgeManager.currentTimeMillis();
+    final List<KeyValue> edits = new ArrayList<KeyValue>(2);
+    edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
+      HConstants.REGIONINFO_QUALIFIER, now,
+      r.getRegionInfo().toByteArray()));
+    // Set into the root table the version of the meta table.
+    edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
+      HConstants.META_VERSION_QUALIFIER, now,
+      Bytes.toBytes(HConstants.META_VERSION)));
+    meta.put(row, HConstants.CATALOG_FAMILY, edits);
   }
 
   /**
@@ -4206,12 +4284,12 @@ public class HRegion implements HeapSize
     a.compactStores(true);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for region: " + a);
-      listPaths(fs, a.getRegionDir());
+      FSUtils.logFileSystemState(fs, a.getRegionDir(), LOG);
     }
     b.compactStores(true);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for region: " + b);
-      listPaths(fs, b.getRegionDir());
+      FSUtils.logFileSystemState(fs, b.getRegionDir(), LOG);
     }
 
     Configuration conf = a.baseConf;
@@ -4270,16 +4348,6 @@ public class HRegion implements HeapSize
       // Because we compacted the source regions we should have no more than two
       // HStoreFiles per family and there will be no reference store
       List<StoreFile> srcFiles = es.getValue();
-      if (srcFiles.size() == 2) {
-        long seqA = srcFiles.get(0).getMaxSequenceId();
-        long seqB = srcFiles.get(1).getMaxSequenceId();
-        if (seqA == seqB) {
-          // Can't have same sequenceid since on open of a store, this is what
-          // distingushes the files (see the map of stores how its keyed by
-          // sequenceid).
-          throw new IOException("Files have same sequenceid: " + seqA);
-        }
-      }
       for (StoreFile hsf: srcFiles) {
         StoreFile.rename(fs, hsf.getPath(),
           StoreFile.getUniqueFile(fs, HStore.getStoreHomedir(tableDir,
@@ -4288,7 +4356,7 @@ public class HRegion implements HeapSize
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for new region");
-      listPaths(fs, newRegionDir);
+      FSUtils.logFileSystemState(fs, newRegionDir, LOG);
     }
     HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf,
         newRegionInfo, a.getTableDesc(), null);
@@ -4302,15 +4370,15 @@ public class HRegion implements HeapSize
     dstRegion.compactStores();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for new region");
-      listPaths(fs, dstRegion.getRegionDir());
+      FSUtils.logFileSystemState(fs, dstRegion.getRegionDir(), LOG);
     }
 
     // delete out the 'A' region
-    HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(a.getBaseConf()),
-        a.getTableDir(), a.getRegionDir());
+    HFileArchiver.archiveRegion(a.getBaseConf(), fs,
+      FSUtils.getRootDir(a.getBaseConf()), a.getTableDir(), a.getRegionDir());
     // delete out the 'B' region
-    HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(b.getBaseConf()),
-        b.getTableDir(), b.getRegionDir());
+    HFileArchiver.archiveRegion(a.getBaseConf(), fs,
+      FSUtils.getRootDir(b.getBaseConf()), b.getTableDir(), b.getRegionDir());
 
     LOG.info("merge completed. New region is " + dstRegion);
 
@@ -4351,42 +4419,15 @@ public class HRegion implements HeapSize
     return false;
   }
 
-  /*
-   * List the files under the specified directory
-   *
-   * @param fs
-   * @param dir
-   * @throws IOException
-   */
-  private static void listPaths(FileSystem fs, Path dir) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      FileStatus[] stats = FSUtils.listStatus(fs, dir, null);
-      if (stats == null || stats.length == 0) {
-        return;
-      }
-      for (int i = 0; i < stats.length; i++) {
-        String path = stats[i].getPath().toString();
-        if (stats[i].isDir()) {
-          LOG.debug("d " + path);
-          listPaths(fs, stats[i].getPath());
-        } else {
-          LOG.debug("f " + path + " size=" + stats[i].getLen());
-        }
-      }
-    }
-  }
-
-
   //
   // HBASE-880
   //
   /**
    * @param get get object
-   * @param lockid existing lock id, or null for no previous lock
    * @return result
    * @throws IOException read exceptions
    */
-  public Result get(final Get get, final Integer lockid) throws IOException {
+  public Result get(final Get get) throws IOException {
     checkRow(get.getRow(), "Get");
     // Verify families are all valid
     if (get.hasFamilies()) {
@@ -4677,12 +4718,11 @@ public class HRegion implements HeapSize
    * Perform one or more append operations on a row.
    *
    * @param append
-   * @param lockid
    * @param writeToWAL
    * @return new keyvalues after increment
    * @throws IOException
    */
-  public Result append(Append append, Integer lockid, boolean writeToWAL)
+  public Result append(Append append, boolean writeToWAL)
       throws IOException {
     byte[] row = append.getRow();
     checkRow(row, "append");
@@ -4699,7 +4739,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     WriteEntry w = null;
     try {
-      Integer lid = getLock(lockid, row, true);
+      Integer lid = getLock(null, row, true);
       lock(this.updatesLock.readLock());
       // wait for all prior MVCC transactions to finish - while we hold the row lock
       // (so that we are guaranteed to see the latest state)
@@ -4841,13 +4881,11 @@ public class HRegion implements HeapSize
   /**
    * Perform one or more increment operations on a row.
    * @param increment
-   * @param lockid
    * @param writeToWAL
    * @return new keyvalues after increment
    * @throws IOException
    */
-  public Result increment(Increment increment, Integer lockid,
-      boolean writeToWAL)
+  public Result increment(Increment increment, boolean writeToWAL)
   throws IOException {
     byte [] row = increment.getRow();
     checkRow(row, "increment");
@@ -4865,7 +4903,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     WriteEntry w = null;
     try {
-      Integer lid = getLock(lockid, row, true);
+      Integer lid = getLock(null, row, true);
       lock(this.updatesLock.readLock());
       // wait for all prior MVCC transactions to finish - while we hold the row lock
       // (so that we are guaranteed to see the latest state)
@@ -4993,8 +5031,8 @@ public class HRegion implements HeapSize
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (9 * Bytes.SIZEOF_LONG) +
+      39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      (10 * Bytes.SIZEOF_LONG) +
       Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@@ -5035,47 +5073,6 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * Registers a new CoprocessorProtocol subclass and instance to
-   * be available for handling {@link HRegion#exec(Exec)} calls.
-   *
-   * <p>
-   * Only a single protocol type/handler combination may be registered per
-   * region.
-   * After the first registration, subsequent calls with the same protocol type
-   * will fail with a return value of {@code false}.
-   * </p>
-   * @param protocol a {@code CoprocessorProtocol} subinterface defining the
-   * protocol methods
-   * @param handler an instance implementing the interface
-   * @param <T> the protocol type
-   * @return {@code true} if the registration was successful, {@code false}
-   * otherwise
-   */
-  @Deprecated
-  public <T extends CoprocessorProtocol> boolean registerProtocol(
-      Class<T> protocol, T handler) {
-
-    /* No stacking of protocol handlers is currently allowed.  The
-     * first to claim wins!
-     */
-    if (protocolHandlers.containsKey(protocol)) {
-      LOG.error("Protocol "+protocol.getName()+
-          " already registered, rejecting request from "+
-          handler
-      );
-      return false;
-    }
-
-    protocolHandlers.putInstance(protocol, handler);
-    protocolHandlerNames.put(protocol.getName(), protocol);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Registered protocol handler: region="+
-          Bytes.toStringBinary(getRegionName())+" protocol="+protocol.getName());
-    }
-    return true;
-  }
-
-  /**
    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
    * be available for handling
    * {@link HRegion#execService(com.google.protobuf.RpcController, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
@@ -5111,73 +5108,6 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
-   * method using the registered protocol handlers.
-   * {@link CoprocessorProtocol} implementations must be registered via the
-   * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
-   * method before they are available.
-   *
-   * @param call an {@code Exec} instance identifying the protocol, method name,
-   *     and parameters for the method invocation
-   * @return an {@code ExecResult} instance containing the region name of the
-   *     invocation and the return value
-   * @throws IOException if no registered protocol handler is found or an error
-   *     occurs during the invocation
-   * @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
-   */
-  @Deprecated
-  public ExecResult exec(Exec call)
-      throws IOException {
-    Class<? extends CoprocessorProtocol> protocol = call.getProtocol();
-    if (protocol == null) {
-      String protocolName = call.getProtocolName();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Received dynamic protocol exec call with protocolName " + protocolName);
-      }
-      // detect the actual protocol class
-      protocol  = protocolHandlerNames.get(protocolName);
-      if (protocol == null) {
-        throw new HBaseRPC.UnknownProtocolException(null,
-            "No matching handler for protocol "+protocolName+
-            " in region "+Bytes.toStringBinary(getRegionName()));
-      }
-    }
-    if (!protocolHandlers.containsKey(protocol)) {
-      throw new HBaseRPC.UnknownProtocolException(protocol,
-          "No matching handler for protocol "+protocol.getName()+
-          " in region "+Bytes.toStringBinary(getRegionName()));
-    }
-
-    CoprocessorProtocol handler = protocolHandlers.getInstance(protocol);
-    Object value;
-
-    try {
-      Method method = protocol.getMethod(
-          call.getMethodName(), call.getParameterClasses());
-      method.setAccessible(true);
-
-      value = method.invoke(handler, call.getParameters());
-    } catch (InvocationTargetException e) {
-      Throwable target = e.getTargetException();
-      if (target instanceof IOException) {
-        throw (IOException)target;
-      }
-      IOException ioe = new IOException(target.toString());
-      ioe.setStackTrace(target.getStackTrace());
-      throw ioe;
-    } catch (Throwable e) {
-      if (!(e instanceof IOException)) {
-        LOG.error("Unexpected throwable object ", e);
-      }
-      IOException ioe = new IOException(e.toString());
-      ioe.setStackTrace(e.getStackTrace());
-      throw ioe;
-    }
-
-    return new ExecResult(getRegionName(), value);
-  }
-
-  /**
    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
    * the registered protocol handlers.  {@link Service} implementations must be registered via the
    * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)}
@@ -5196,7 +5126,7 @@ public class HRegion implements HeapSize
     String serviceName = call.getServiceName();
     String methodName = call.getMethodName();
     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
-      throw new HBaseRPC.UnknownProtocolException(null,
+      throw new UnknownProtocolException(null,
           "No registered coprocessor service found for name "+serviceName+
           " in region "+Bytes.toStringBinary(getRegionName()));
     }
@@ -5205,7 +5135,7 @@ public class HRegion implements HeapSize
     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
     if (methodDesc == null) {
-      throw new HBaseRPC.UnknownProtocolException(service.getClass(),
+      throw new UnknownProtocolException(service.getClass(),
           "Unknown method "+methodName+" called on service "+serviceName+
               " in region "+Bytes.toStringBinary(getRegionName()));
     }
@@ -5564,4 +5494,44 @@ public class HRegion implements HeapSize
        if (bc != null) bc.shutdown();
     }
   }
+
+  /**
+   * Gets the latest sequence number that was read from storage when this region was opened.
+   */
+  public long getOpenSeqNum() {
+    return this.openSeqNum;
+  }
+
+  /**
+   * Listener class to enable callers of
+   * bulkLoadHFile() to perform any necessary
+   * pre/post processing of a given bulkload call
+   */
+  public static interface BulkLoadListener {
+
+    /**
+     * Called before an HFile is actually loaded
+     * @param family family being loaded to
+     * @param srcPath path of HFile
+     * @return final path to be used for actual loading
+     * @throws IOException
+     */
+    String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
+
+    /**
+     * Called after a successful HFile load
+     * @param family family being loaded to
+     * @param srcPath path of HFile
+     * @throws IOException
+     */
+    void doneBulkLoad(byte[] family, String srcPath) throws IOException;
+
+    /**
+     * Called after a failed HFile load
+     * @param family family being loaded to
+     * @param srcPath path of HFile
+     * @throws IOException
+     */
+    void failedBulkLoad(byte[] family, String srcPath) throws IOException;
+  }
 }



Mime
View raw message