hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjus...@apache.org
Subject svn commit: r1546878 [1/2] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-common/src/main/java/...
Date Mon, 02 Dec 2013 02:20:36 GMT
Author: zjushch
Date: Mon Dec  2 02:20:35 2013
New Revision: 1546878

URL: http://svn.apache.org/r1546878
Log:
HBASE-4811 Support reverse Scan

Added:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Mon Dec  2 02:20:35 2013
@@ -54,7 +54,7 @@ public class ClientScanner extends Abstr
     // Current region scanner is against.  Gets cleared if current region goes
     // wonky: e.g. if it splits on us.
     protected HRegionInfo currentRegion = null;
-    private ScannerCallable callable = null;
+    protected ScannerCallable callable = null;
     protected final LinkedList<Result> cache = new LinkedList<Result>();
     protected final int caching;
     protected long lastNext;
@@ -219,7 +219,7 @@ public class ClientScanner extends Abstr
      * @param nbRows
      * @param done Server-side says we're done scanning.
      */
-    private boolean nextScanner(int nbRows, final boolean done)
+  protected boolean nextScanner(int nbRows, final boolean done)
     throws IOException {
       // Close the previous scanner if it's open
       if (this.callable != null) {

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Mon Dec  2 02:20:35 2013
@@ -713,9 +713,12 @@ public class HTable implements HTableInt
     if (scan.getCaching() <= 0) {
       scan.setCaching(getScannerCaching());
     }
-    if (scan.isSmall()) {
+    if (scan.isSmall() && !scan.isReversed()) {
       return new ClientSmallScanner(getConfiguration(), scan, getName(),
           this.connection);
+    } else if (scan.isReversed()) {
+      return new ReversedClientScanner(getConfiguration(), scan, getName(),
+          this.connection);
     }
     return new ClientScanner(getConfiguration(), scan,
         getName(), this.connection);

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java Mon Dec  2 02:20:35 2013
@@ -43,10 +43,10 @@ import org.apache.hadoop.hbase.util.Byte
 public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
   // Public because used outside of this package over in ipc.
   static final Log LOG = LogFactory.getLog(RegionServerCallable.class);
-  private final HConnection connection;
-  private final TableName tableName;
-  private final byte [] row;
-  private HRegionLocation location;
+  protected final HConnection connection;
+  protected final TableName tableName;
+  protected final byte[] row;
+  protected HRegionLocation location;
   private ClientService.BlockingInterface stub;
 
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java?rev=1546878&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java Mon Dec  2 02:20:35 2013
@@ -0,0 +1,171 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A reversed client scanner which support backward scanning
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ReversedClientScanner extends ClientScanner {
+  private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
+  // A byte array in which all elements are the max byte, and it is used to
+  // construct closest front row
+  static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
+  /**
+   * Create a new ReversibleClientScanner for the specified table Note that the
+   * passed {@link Scan}'s start row maybe changed.
+   * @param conf The {@link Configuration} to use.
+   * @param scan {@link Scan} to use in this scanner
+   * @param tableName The table that we wish to scan
+   * @param connection Connection identifying the cluster
+   * @throws IOException
+   */
+  public ReversedClientScanner(Configuration conf, Scan scan,
+      TableName tableName, HConnection connection) throws IOException {
+    super(conf, scan, tableName, connection);
+  }
+
+  @Override
+  protected boolean nextScanner(int nbRows, final boolean done)
+      throws IOException {
+    // Close the previous scanner if it's open
+    if (this.callable != null) {
+      this.callable.setClose();
+      this.caller.callWithRetries(callable);
+      this.callable = null;
+    }
+
+    // Where to start the next scanner
+    byte[] localStartKey;
+    boolean locateTheClosestFrontRow = true;
+    // if we're at start of table, close and return false to stop iterating
+    if (this.currentRegion != null) {
+      byte[] startKey = this.currentRegion.getStartKey();
+      if (startKey == null
+          || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
+          || checkScanStopRow(startKey) || done) {
+        close();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Finished " + this.currentRegion);
+        }
+        return false;
+      }
+      localStartKey = startKey;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Finished " + this.currentRegion);
+      }
+    } else {
+      localStartKey = this.scan.getStartRow();
+      if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) {
+        locateTheClosestFrontRow = false;
+      }
+    }
+
+    if (LOG.isDebugEnabled() && this.currentRegion != null) {
+      // Only worth logging if NOT first region in scan.
+      LOG.debug("Advancing internal scanner to startKey at '"
+          + Bytes.toStringBinary(localStartKey) + "'");
+    }
+    try {
+      // In reversed scan, we want to locate the previous region through current
+      // region's start key. In order to get that previous region, first we
+      // create a closest row before the start key of current region, then
+      // locate all the regions from the created closest row to start key of
+      // current region, thus the last one of located regions should be the
+      // previous region of current region. The related logic of locating
+      // regions is implemented in ReversedScannerCallable
+      byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey)
+          : null;
+      callable = getScannerCallable(localStartKey, nbRows, locateStartRow);
+      // Open a scanner on the region server starting at the
+      // beginning of the region
+      this.caller.callWithRetries(callable);
+      this.currentRegion = callable.getHRegionInfo();
+      if (this.scanMetrics != null) {
+        this.scanMetrics.countOfRegions.incrementAndGet();
+      }
+    } catch (IOException e) {
+      close();
+      throw e;
+    }
+    return true;
+  }
+  
+  protected ScannerCallable getScannerCallable(byte[] localStartKey,
+      int nbRows, byte[] locateStartRow) {
+    scan.setStartRow(localStartKey);
+    ScannerCallable s = new ReversedScannerCallable(getConnection(),
+        getTable(), scan, this.scanMetrics, locateStartRow);
+    s.setCaching(nbRows);
+    return s;
+  }
+
+  @Override
+  // returns true if stopRow >= passed region startKey
+  protected boolean checkScanStopRow(final byte[] startKey) {
+    if (this.scan.getStopRow().length > 0) {
+      // there is a stop row, check to see if we are past it.
+      byte[] stopRow = scan.getStopRow();
+      int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0,
+          startKey.length);
+      if (cmp >= 0) {
+        // stopRow >= startKey (stopRow is equals to or larger than endKey)
+        // This is a stop.
+        return true;
+      }
+    }
+    return false; // unlikely.
+  }
+
+  /**
+   * Create the closest row before the specified row
+   * @param row
+   * @return a new byte array which is the closest front row of the specified one
+   */
+  private byte[] createClosestRowBefore(byte[] row) {
+    if (row == null) {
+      throw new IllegalArgumentException("The passed row is empty");
+    }
+    if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
+      return MAX_BYTE_ARRAY;
+    }
+    if (row[row.length - 1] == 0) {
+      return Arrays.copyOf(row, row.length - 1);
+    } else {
+      byte[] closestFrontRow = Arrays.copyOf(row, row.length);
+      closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
+      closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
+      return closestFrontRow;
+    }
+  }
+
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java?rev=1546878&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java Mon Dec  2 02:20:35 2013
@@ -0,0 +1,141 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A reversed ScannerCallable which supports backward scanning.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ReversedScannerCallable extends ScannerCallable {
+  /**
+   * The start row for locating regions. In reversed scanner, may locate the
+   * regions for a range of keys when doing
+   * {@link ReversedClientScanner#nextScanner(int, boolean)}
+   */
+  protected final byte[] locateStartRow;
+
+  /**
+   * 
+   * @param connection
+   * @param tableName
+   * @param scan
+   * @param scanMetrics
+   * @param locateStartRow The start row for locating regions
+   */
+  public ReversedScannerCallable(HConnection connection, TableName tableName,
+      Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
+    super(connection, tableName, scan, scanMetrics);
+    this.locateStartRow = locateStartRow;
+  }
+
+  /**
+   * @param reload force reload of server location
+   * @throws IOException
+   */
+  @Override
+  public void prepare(boolean reload) throws IOException {
+    if (!instantiated || reload) {
+      if (locateStartRow == null) {
+        // Just locate the region with the row
+        this.location = connection.getRegionLocation(tableName, row, reload);
+        if (this.location == null) {
+          throw new IOException("Failed to find location, tableName="
+              + tableName + ", row=" + Bytes.toString(row) + ", reload="
+              + reload);
+        }
+      } else {
+        // Need to locate the regions with the range, and the target location is
+        // the last one which is the previous region of last region scanner
+        List<HRegionLocation> locatedRegions = locateRegionsInRange(
+            locateStartRow, row, reload);
+        if (locatedRegions.isEmpty()) {
+          throw new DoNotRetryIOException(
+              "Does hbase:meta exist hole? Couldn't get regions for the range from "
+                  + Bytes.toStringBinary(locateStartRow) + " to "
+                  + Bytes.toStringBinary(row));
+        }
+        this.location = locatedRegions.get(locatedRegions.size() - 1);
+      }
+      setStub(getConnection().getClient(getLocation().getServerName()));
+      checkIfRegionServerIsRemote();
+      instantiated = true;
+    }
+
+    // check how often we retry.
+    // HConnectionManager will call instantiateServer with reload==true
+    // if and only if for retries.
+    if (reload && this.scanMetrics != null) {
+      this.scanMetrics.countOfRPCRetries.incrementAndGet();
+      if (isRegionServerRemote) {
+        this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
+      }
+    }
+  }
+
+  /**
+   * Get the corresponding regions for an arbitrary range of keys.
+   * @param startKey Starting row in range, inclusive
+   * @param endKey Ending row in range, exclusive
+   * @param reload force reload of server location
+   * @return A list of HRegionLocation corresponding to the regions that contain
+   *         the specified range
+   * @throws IOException
+   */
+  private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
+      byte[] endKey, boolean reload) throws IOException {
+    final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
+        HConstants.EMPTY_END_ROW);
+    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
+      throw new IllegalArgumentException("Invalid range: "
+          + Bytes.toStringBinary(startKey) + " > "
+          + Bytes.toStringBinary(endKey));
+    }
+    List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
+    byte[] currentKey = startKey;
+    do {
+      HRegionLocation regionLocation = connection.getRegionLocation(tableName,
+          currentKey, reload);
+      if (regionLocation.getRegionInfo().containsRow(currentKey)) {
+        regionList.add(regionLocation);
+      } else {
+        throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
+            + Bytes.toStringBinary(currentKey) + " returns incorrect region "
+            + regionLocation.getRegionInfo());
+      }
+      currentKey = regionLocation.getRegionInfo().getEndKey();
+    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
+        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
+    return regionList;
+  }
+
+}

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java Mon Dec  2 02:20:35 2013
@@ -110,6 +110,7 @@ public class Scan extends Query {
   private int caching = -1;
   private long maxResultSize = -1;
   private boolean cacheBlocks = true;
+  private boolean reversed = false;
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
@@ -550,6 +551,27 @@ public class Scan extends Query {
   }
 
   /**
+   * Set whether this scan is a reversed one
+   * <p>
+   * This is false by default which means forward(normal) scan.
+   * 
+   * @param reversed if true, scan will be backward order
+   * @return this
+   */
+  public Scan setReversed(boolean reversed) {
+    this.reversed = reversed;
+    return this;
+  }
+
+  /**
+   * Get whether this scan is a reversed one.
+   * @return true if backward scan, false if forward(default) scan
+   */
+  public boolean isReversed() {
+    return reversed;
+  }
+
+  /**
    * Set the value indicating whether loading CFs on demand should be allowed (cluster
    * default is false). On-demand CF loading doesn't load column families until necessary, e.g.
    * if you filter on one column, the other column family data will be loaded only for the rows

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Mon Dec  2 02:20:35 2013
@@ -62,11 +62,11 @@ public class ScannerCallable extends Reg
 
   public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
   private long scannerId = -1L;
-  private boolean instantiated = false;
+  protected boolean instantiated = false;
   private boolean closed = false;
   private Scan scan;
   private int caching = 1;
-  private ScanMetrics scanMetrics;
+  protected ScanMetrics scanMetrics;
   private boolean logScannerActivity = false;
   private int logCutOffLatency = 1000;
   private static String myAddress;
@@ -79,7 +79,7 @@ public class ScannerCallable extends Reg
   }
 
   // indicate if it is a remote server call
-  private boolean isRegionServerRemote = true;
+  protected boolean isRegionServerRemote = true;
   private long nextCallSeq = 0;
   
   /**
@@ -135,7 +135,7 @@ public class ScannerCallable extends Reg
    * compare the local machine hostname with region server's hostname
    * to decide if hbase client connects to a remote region server
    */
-  private void checkIfRegionServerIsRemote() {
+  protected void checkIfRegionServerIsRemote() {
     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
       isRegionServerRemote = false;
     } else {

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java Mon Dec  2 02:20:35 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.exception
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public abstract class Filter {
+  protected boolean reversed;
   /**
    * Reset the state of the filter between rows.
    * 
@@ -277,4 +278,16 @@ public abstract class Filter {
    * @throws IOException in case an I/O or an filter specific failure needs to be signaled.
    */
   abstract boolean areSerializedFieldsEqual(Filter other);
+
+  /**
+   * alter the reversed scan flag
+   * @param reversed flag
+   */
+  public void setReversed(boolean reversed) {
+    this.reversed = reversed;
+  }
+
+  public boolean isReversed() {
+    return this.reversed;
+  }
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java Mon Dec  2 02:20:35 2013
@@ -159,6 +159,11 @@ final public class FilterList extends Fi
    * @param filter another filter
    */
   public void addFilter(Filter filter) {
+    if (this.isReversed() != filter.isReversed()) {
+      throw new IllegalArgumentException(
+          "Filters in the list must have the same reversed flag, this.reversed="
+              + this.isReversed());
+    }
     this.filters.add(filter);
   }
 
@@ -464,6 +469,14 @@ final public class FilterList extends Fi
   }
 
   @Override
+  public void setReversed(boolean reversed) {
+    for (Filter filter : filters) {
+      filter.setReversed(reversed);
+    }
+    this.reversed = reversed;
+  }
+
+  @Override
   public String toString() {
     return toString(MAX_LOG_FILTERS);
   }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java Mon Dec  2 02:20:35 2013
@@ -59,7 +59,7 @@ public class PrefixFilter extends Filter
     // if we are passed the prefix, set flag
     int cmp = Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0,
         this.prefix.length);
-    if(cmp > 0) {
+    if ((!isReversed() && cmp > 0) || (isReversed() && cmp < 0)) {
       passedPrefix = true;
     }
     filterRow = (cmp != 0);

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Mon Dec  2 02:20:35 2013
@@ -850,6 +850,9 @@ public final class ProtobufUtil {
     if (scan.getRowOffsetPerColumnFamily() > 0) {
       scanBuilder.setStoreOffset(scan.getRowOffsetPerColumnFamily());
     }
+    if (scan.isReversed()) {
+      scanBuilder.setReversed(scan.isReversed());
+    }
     return scanBuilder.build();
   }
 
@@ -926,6 +929,9 @@ public final class ProtobufUtil {
         }
       }
     }
+    if (proto.hasReversed()) {
+      scan.setReversed(proto.getReversed());
+    }
     return scan;
   }
 

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Mon Dec  2 02:20:35 2013
@@ -1986,6 +1986,37 @@ public class Bytes {
   }
 
   /**
+   * Create a max byte array with the specified max byte count
+   * @param maxByteCount the length of returned byte array
+   * @return the created max byte array
+   */
+  public static byte[] createMaxByteArray(int maxByteCount) {
+    byte[] maxByteArray = new byte[maxByteCount];
+    for (int i = 0; i < maxByteArray.length; i++) {
+      maxByteArray[i] = (byte) 0xff;
+    }
+    return maxByteArray;
+  }
+
+  /**
+   * Create a byte array which is multiple given bytes
+   * @param srcBytes
+   * @param multiNum
+   * @return byte array
+   */
+  public static byte[] multiple(byte[] srcBytes, int multiNum) {
+    if (multiNum <= 0) {
+      return new byte[0];
+    }
+    byte[] result = new byte[srcBytes.length * multiNum];
+    for (int i = 0; i < multiNum; i++) {
+      System.arraycopy(srcBytes, 0, result, i * srcBytes.length,
+        srcBytes.length);
+    }
+    return result;
+  }
+  
+  /**
    * Convert a byte array into a hex string
    * @param b
    */

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java Mon Dec  2 02:20:35 2013
@@ -13330,6 +13330,16 @@ public final class ClientProtos {
      * <code>optional bool small = 14;</code>
      */
     boolean getSmall();
+
+    // optional bool reversed = 15 [default = false];
+    /**
+     * <code>optional bool reversed = 15 [default = false];</code>
+     */
+    boolean hasReversed();
+    /**
+     * <code>optional bool reversed = 15 [default = false];</code>
+     */
+    boolean getReversed();
   }
   /**
    * Protobuf type {@code Scan}
@@ -13485,6 +13495,11 @@ public final class ClientProtos {
               small_ = input.readBool();
               break;
             }
+            case 120: {
+              bitField0_ |= 0x00001000;
+              reversed_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -13815,6 +13830,22 @@ public final class ClientProtos {
       return small_;
     }
 
+    // optional bool reversed = 15 [default = false];
+    public static final int REVERSED_FIELD_NUMBER = 15;
+    private boolean reversed_;
+    /**
+     * <code>optional bool reversed = 15 [default = false];</code>
+     */
+    public boolean hasReversed() {
+      return ((bitField0_ & 0x00001000) == 0x00001000);
+    }
+    /**
+     * <code>optional bool reversed = 15 [default = false];</code>
+     */
+    public boolean getReversed() {
+      return reversed_;
+    }
+
     private void initFields() {
       column_ = java.util.Collections.emptyList();
       attribute_ = java.util.Collections.emptyList();
@@ -13830,6 +13861,7 @@ public final class ClientProtos {
       storeOffset_ = 0;
       loadColumnFamiliesOnDemand_ = false;
       small_ = false;
+      reversed_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -13903,6 +13935,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000800) == 0x00000800)) {
         output.writeBool(14, small_);
       }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        output.writeBool(15, reversed_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -13968,6 +14003,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(14, small_);
       }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(15, reversed_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -14055,6 +14094,11 @@ public final class ClientProtos {
         result = result && (getSmall()
             == other.getSmall());
       }
+      result = result && (hasReversed() == other.hasReversed());
+      if (hasReversed()) {
+        result = result && (getReversed()
+            == other.getReversed());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -14124,6 +14168,10 @@ public final class ClientProtos {
         hash = (37 * hash) + SMALL_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getSmall());
       }
+      if (hasReversed()) {
+        hash = (37 * hash) + REVERSED_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getReversed());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -14292,6 +14340,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00001000);
         small_ = false;
         bitField0_ = (bitField0_ & ~0x00002000);
+        reversed_ = false;
+        bitField0_ = (bitField0_ & ~0x00004000);
         return this;
       }
 
@@ -14394,6 +14444,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000800;
         }
         result.small_ = small_;
+        if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+          to_bitField0_ |= 0x00001000;
+        }
+        result.reversed_ = reversed_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -14498,6 +14552,9 @@ public final class ClientProtos {
         if (other.hasSmall()) {
           setSmall(other.getSmall());
         }
+        if (other.hasReversed()) {
+          setReversed(other.getReversed());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -15609,6 +15666,39 @@ public final class ClientProtos {
         return this;
       }
 
+      // optional bool reversed = 15 [default = false];
+      private boolean reversed_ ;
+      /**
+       * <code>optional bool reversed = 15 [default = false];</code>
+       */
+      public boolean hasReversed() {
+        return ((bitField0_ & 0x00004000) == 0x00004000);
+      }
+      /**
+       * <code>optional bool reversed = 15 [default = false];</code>
+       */
+      public boolean getReversed() {
+        return reversed_;
+      }
+      /**
+       * <code>optional bool reversed = 15 [default = false];</code>
+       */
+      public Builder setReversed(boolean value) {
+        bitField0_ |= 0x00004000;
+        reversed_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool reversed = 15 [default = false];</code>
+       */
+      public Builder clearReversed() {
+        bitField0_ = (bitField0_ & ~0x00004000);
+        reversed_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:Scan)
     }
 

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto Mon Dec  2 02:20:35 2013
@@ -232,6 +232,7 @@ message Scan {
   optional uint32 store_offset = 12;
   optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */
   optional bool small = 14;
+  optional bool reversed = 15 [default = false];
 }
 
 /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Dec  2 02:20:35 2013
@@ -1220,6 +1220,13 @@ public class HRegion implements HeapSize
     return size;
   }
 
+  /**
+   * @return KeyValue Comparator
+   */
+  public KeyValue.KVComparator getComparator() {
+    return this.comparator;
+  }
+
   /*
    * Do preparation for pending compaction.
    * @throws IOException
@@ -1769,6 +1776,12 @@ public class HRegion implements HeapSize
 
   protected RegionScanner instantiateRegionScanner(Scan scan,
       List<KeyValueScanner> additionalScanners) throws IOException {
+    if (scan.isReversed()) {
+      if (scan.getFilter() != null) {
+        scan.getFilter().setReversed(true);
+      }
+      return new ReversedRegionScannerImpl(scan, additionalScanners, this);
+    }
     return new RegionScannerImpl(scan, additionalScanners, this);
   }
 
@@ -3508,17 +3521,17 @@ public class HRegion implements HeapSize
     /**
      * If the joined heap data gathering is interrupted due to scan limits, this will
      * contain the row for which we are populating the values.*/
-    private KeyValue joinedContinuationRow = null;
+    protected KeyValue joinedContinuationRow = null;
     // KeyValue indicating that limit is reached when scanning
     private final KeyValue KV_LIMIT = new KeyValue();
-    private final byte [] stopRow;
+    protected final byte[] stopRow;
     private Filter filter;
     private int batch;
-    private int isScan;
+    protected int isScan;
     private boolean filterClosed = false;
     private long readPt;
     private long maxResultSize;
-    private HRegion region;
+    protected HRegion region;
 
     @Override
     public HRegionInfo getRegionInfo() {
@@ -3573,16 +3586,22 @@ public class HRegion implements HeapSize
           joinedScanners.add(scanner);
         }
       }
-      this.storeHeap = new KeyValueHeap(scanners, comparator);
-      if (!joinedScanners.isEmpty()) {
-        this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
-      }
+      initializeKVHeap(scanners, joinedScanners, region);
     }
 
     RegionScannerImpl(Scan scan, HRegion region) throws IOException {
       this(scan, null, region);
     }
 
+    protected void initializeKVHeap(List<KeyValueScanner> scanners,
+        List<KeyValueScanner> joinedScanners, HRegion region)
+        throws IOException {
+      this.storeHeap = new KeyValueHeap(scanners, region.comparator);
+      if (!joinedScanners.isEmpty()) {
+        this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
+      }
+    }
+
     @Override
     public long getMaxResultSize() {
       return maxResultSize;
@@ -3854,7 +3873,7 @@ public class HRegion implements HeapSize
                                                                    currentRow);
     }
 
-    private boolean isStopRow(byte [] currentRow, int offset, short length) {
+    protected boolean isStopRow(byte[] currentRow, int offset, short length) {
       return currentRow == null ||
           (stopRow != null &&
           comparator.compareRows(stopRow, 0, stopRow.length,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Mon Dec  2 02:20:35 2013
@@ -1750,7 +1750,9 @@ public class HStore implements Store {
         scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
       }
       if (scanner == null) {
-        scanner = new StoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+        scanner = scan.isReversed() ? new ReversedStoreScanner(this,
+            getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
+            getScanInfo(), scan, targetCols, readPt);
       }
       return scanner;
     } finally {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Mon Dec  2 02:20:35 2013
@@ -42,9 +42,9 @@ import org.apache.hadoop.hbase.KeyValue.
  * as an InternalScanner at the Store level, you will get runtime exceptions.
  */
 @InterfaceAudience.Private
-public class KeyValueHeap extends NonLazyKeyValueScanner
+public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
     implements KeyValueScanner, InternalScanner {
-  private PriorityQueue<KeyValueScanner> heap = null;
+  protected PriorityQueue<KeyValueScanner> heap = null;
 
   /**
    * The current sub-scanner, i.e. the one that contains the next key/value
@@ -56,9 +56,9 @@ public class KeyValueHeap extends NonLaz
    * Bloom filter optimization, which is OK to propagate to StoreScanner. In
    * order to ensure that, always use {@link #pollRealKV()} to update current.
    */
-  private KeyValueScanner current = null;
+  protected KeyValueScanner current = null;
 
-  private KVScannerComparator comparator;
+  protected KVScannerComparator comparator;
   
   /**
    * Constructor.  This KeyValueHeap will handle closing of passed in
@@ -68,7 +68,18 @@ public class KeyValueHeap extends NonLaz
    */
   public KeyValueHeap(List<? extends KeyValueScanner> scanners,
       KVComparator comparator) throws IOException {
-    this.comparator = new KVScannerComparator(comparator);
+    this(scanners, new KVScannerComparator(comparator));
+  }
+
+  /**
+   * Constructor.
+   * @param scanners
+   * @param comparator
+   * @throws IOException
+   */
+  KeyValueHeap(List<? extends KeyValueScanner> scanners,
+      KVScannerComparator comparator) throws IOException {
+    this.comparator = comparator;
     if (!scanners.isEmpty()) {
       this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
           this.comparator);
@@ -158,8 +169,8 @@ public class KeyValueHeap extends NonLaz
     return next(result, -1);
   }
 
-  private static class KVScannerComparator implements Comparator<KeyValueScanner> {
-    private KVComparator kvComparator;
+  protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
+    protected KVComparator kvComparator;
     /**
      * Constructor
      * @param kvComparator
@@ -325,7 +336,7 @@ public class KeyValueHeap extends NonLaz
    * this scanner heap if (1) it has done a real seek and (2) its KV is the top
    * among all top KVs (some of which are fake) in the scanner heap.
    */
-  private KeyValueScanner pollRealKV() throws IOException {
+  protected KeyValueScanner pollRealKV() throws IOException {
     KeyValueScanner kvScanner = heap.poll();
     if (kvScanner == null) {
       return null;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java Mon Dec  2 02:20:35 2013
@@ -123,4 +123,37 @@ public interface KeyValueScanner {
    *         assumed.
    */
   boolean isFileScanner();
+
+  // Support for "Reversed Scanner"
+  /**
+   * Seek the scanner at or before the row of specified KeyValue, it firstly
+   * tries to seek the scanner at or after the specified KeyValue, return if
+   * peek KeyValue of scanner has the same row with specified KeyValue,
+   * otherwise seek the scanner at the first KeyValue of the row which is the
+   * previous row of specified KeyValue
+   * 
+   * @param key seek KeyValue
+   * @return true if the scanner is at the valid KeyValue, false if such
+   *         KeyValue does not exist
+   * 
+   */
+  public boolean backwardSeek(KeyValue key) throws IOException;
+
+  /**
+   * Seek the scanner at the first KeyValue of the row which is the previous row
+   * of specified key
+   * @param key seek value
+   * @return true if the scanner at the first valid KeyValue of previous row,
+   *         false if not existing such KeyValue
+   */
+  public boolean seekToPreviousRow(KeyValue key) throws IOException;
+
+  /**
+   * Seek the scanner at the first KeyValue of last row
+   * 
+   * @return true if scanner has values left, false if the underlying data is
+   *         empty
+   * @throws IOException
+   */
+  public boolean seekToLastRow() throws IOException;
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Mon Dec  2 02:20:35 2013
@@ -682,6 +682,10 @@ public class MemStore implements HeapSiz
     volatile MemStoreLAB allocatorAtCreation;
     volatile MemStoreLAB snapshotAllocatorAtCreation;
     
+    // A flag represents whether could stop skipping KeyValues for MVCC
+    // if have encountered the next row. Only used for reversed scan
+    private boolean stopSkippingKVsIfNextRow = false;
+
     private long readPoint;
 
     /*
@@ -722,6 +726,7 @@ public class MemStore implements HeapSiz
     }
 
     private KeyValue getNext(Iterator<KeyValue> it) {
+      KeyValue startKV = theNext;
       KeyValue v = null;
       try {
         while (it.hasNext()) {
@@ -729,6 +734,10 @@ public class MemStore implements HeapSiz
           if (v.getMvccVersion() <= this.readPoint) {
             return v;
           }
+          if (stopSkippingKVsIfNextRow && startKV != null
+              && comparator.compareRows(v, startKV) > 0) {
+            return null;
+          }
         }
 
         return null;
@@ -907,6 +916,70 @@ public class MemStore implements HeapSiz
         long oldestUnexpiredTS) {
       return shouldSeek(scan, oldestUnexpiredTS);
     }
+
+    /**
+     * Seek scanner to the given key first. If it returns false(means
+     * peek()==null) or scanner's peek row is bigger than row of given key, seek
+     * the scanner to the previous row of given key
+     */
+    @Override
+    public synchronized boolean backwardSeek(KeyValue key) {
+      seek(key);
+      if (peek() == null || comparator.compareRows(peek(), key) > 0) {
+        return seekToPreviousRow(key);
+      }
+      return true;
+    }
+
+    /**
+     * Separately get the KeyValue before the specified key from kvset and
+     * snapshotset, and use the row of higher one as the previous row of
+     * specified key, then seek to the first KeyValue of previous row
+     */
+    @Override
+    public synchronized boolean seekToPreviousRow(KeyValue key) {
+      KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
+      SortedSet<KeyValue> kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
+      KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
+      SortedSet<KeyValue> snapshotHead = snapshotAtCreation
+          .headSet(firstKeyOnRow);
+      KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
+          .last();
+      KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
+      if (lastKVBeforeRow == null) {
+        theNext = null;
+        return false;
+      }
+      KeyValue firstKeyOnPreviousRow = KeyValue
+          .createFirstOnRow(lastKVBeforeRow.getRow());
+      this.stopSkippingKVsIfNextRow = true;
+      seek(firstKeyOnPreviousRow);
+      this.stopSkippingKVsIfNextRow = false;
+      if (peek() == null
+          || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
+        return seekToPreviousRow(lastKVBeforeRow);
+      }
+      return true;
+    }
+
+    @Override
+    public synchronized boolean seekToLastRow() {
+      KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
+          .last();
+      KeyValue second = snapshotAtCreation.isEmpty() ? null
+          : snapshotAtCreation.last();
+      KeyValue higherKv = getHighest(first, second);
+      if (higherKv == null) {
+        return false;
+      }
+      KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
+      if (seek(firstKvOnLastRow)) {
+        return true;
+      } else {
+        return seekToPreviousRow(higherKv);
+      }
+
+    }
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java?rev=1546878&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java Mon Dec  2 02:20:35 2013
@@ -0,0 +1,54 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * A "non-reversed & non-lazy" scanner which does not support backward scanning
+ * and always does a real seek operation. Most scanners are inherited from this
+ * class.
+ */
+@InterfaceAudience.Private
+public abstract class NonReversedNonLazyKeyValueScanner extends
+    NonLazyKeyValueScanner {
+
+  @Override
+  public boolean backwardSeek(KeyValue key) throws IOException {
+    throw new NotImplementedException("backwardSeek must not be called on a "
+        + "non-reversed scanner");
+  }
+
+  @Override
+  public boolean seekToPreviousRow(KeyValue key) throws IOException {
+    throw new NotImplementedException("seekToPreviousRow must not be called on a "
+        + "non-reversed scanner");
+  }
+
+  @Override
+  public boolean seekToLastRow() throws IOException {
+    throw new NotImplementedException("seekToLastRow must not be called on a "
+        + "non-reversed scanner");
+  }
+
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java?rev=1546878&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java Mon Dec  2 02:20:35 2013
@@ -0,0 +1,192 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+
+/**
+ * ReversedKeyValueHeap is used for supporting reversed scanning. Compared with
+ * KeyValueHeap, its scanner comparator is a little different (see
+ * ReversedKVScannerComparator), all seek is backward seek(see
+ * {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row
+ * if it is already at the end of one row when calling next().
+ */
+@InterfaceAudience.Private
+public class ReversedKeyValueHeap extends KeyValueHeap {
+
+  /**
+   * @param scanners
+   * @param comparator
+   * @throws IOException
+   */
+  public ReversedKeyValueHeap(List<? extends KeyValueScanner> scanners,
+      KVComparator comparator) throws IOException {
+    super(scanners, new ReversedKVScannerComparator(comparator));
+  }
+
+  @Override
+  public boolean seek(KeyValue seekKey) throws IOException {
+    throw new IllegalStateException(
+        "seek cannot be called on ReversedKeyValueHeap");
+  }
+
+  @Override
+  public boolean reseek(KeyValue seekKey) throws IOException {
+    throw new IllegalStateException(
+        "reseek cannot be called on ReversedKeyValueHeap");
+  }
+
+  @Override
+  public boolean requestSeek(KeyValue key, boolean forward, boolean useBloom)
+      throws IOException {
+    throw new IllegalStateException(
+        "requestSeek cannot be called on ReversedKeyValueHeap");
+  }
+
+  @Override
+  public boolean seekToPreviousRow(KeyValue seekKey) throws IOException {
+    if (current == null) {
+      return false;
+    }
+    heap.add(current);
+    current = null;
+
+    KeyValueScanner scanner;
+    while ((scanner = heap.poll()) != null) {
+      KeyValue topKey = scanner.peek();
+      if (comparator.getComparator().compareRows(topKey.getBuffer(),
+          topKey.getRowOffset(), topKey.getRowLength(), seekKey.getBuffer(),
+          seekKey.getRowOffset(), seekKey.getRowLength()) < 0) {
+        // Row of Top KeyValue is before Seek row.
+        heap.add(scanner);
+        current = pollRealKV();
+        return current != null;
+      }
+
+      if (!scanner.seekToPreviousRow(seekKey)) {
+        scanner.close();
+      } else {
+        heap.add(scanner);
+      }
+    }
+
+    // Heap is returning empty, scanner is done
+    return false;
+  }
+
+  @Override
+  public boolean backwardSeek(KeyValue seekKey) throws IOException {
+    if (current == null) {
+      return false;
+    }
+    heap.add(current);
+    current = null;
+
+    KeyValueScanner scanner;
+    while ((scanner = heap.poll()) != null) {
+      KeyValue topKey = scanner.peek();
+      if ((comparator.getComparator().matchingRows(seekKey, topKey) && comparator
+          .getComparator().compare(seekKey, topKey) <= 0)
+          || comparator.getComparator().compareRows(seekKey, topKey) > 0) {
+        heap.add(scanner);
+        current = pollRealKV();
+        return current != null;
+      }
+      if (!scanner.backwardSeek(seekKey)) {
+        scanner.close();
+      } else {
+        heap.add(scanner);
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public KeyValue next() throws IOException {
+    if (this.current == null) {
+      return null;
+    }
+    KeyValue kvReturn = this.current.next();
+    KeyValue kvNext = this.current.peek();
+    if (kvNext == null
+        || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) {
+      if (this.current.seekToPreviousRow(kvReturn)) {
+        this.heap.add(this.current);
+      } else {
+        this.current.close();
+      }
+      this.current = pollRealKV();
+    } else {
+      KeyValueScanner topScanner = this.heap.peek();
+      if (topScanner != null
+          && this.comparator.compare(this.current, topScanner) > 0) {
+        this.heap.add(this.current);
+        this.current = pollRealKV();
+      }
+    }
+    return kvReturn;
+  }
+
+  /**
+   * In ReversedKVScannerComparator, we compare the row of scanners' peek values
+   * first, sort bigger one before the smaller one. Then compare the KeyValue if
+   * they have the equal row, sort smaller one before the bigger one
+   */
+  private static class ReversedKVScannerComparator extends
+      KVScannerComparator {
+
+    /**
+     * Constructor
+     * @param kvComparator
+     */
+    public ReversedKVScannerComparator(KVComparator kvComparator) {
+      super(kvComparator);
+    }
+
+    @Override
+    public int compare(KeyValueScanner left, KeyValueScanner right) {
+      int rowComparison = compareRows(left.peek(), right.peek());
+      if (rowComparison != 0) {
+        return -rowComparison;
+      }
+      return super.compare(left, right);
+    }
+
+    /**
+     * Compares rows of two KeyValue
+     * @param left
+     * @param right
+     * @return less than 0 if left is smaller, 0 if equal etc..
+     */
+    public int compareRows(KeyValue left, KeyValue right) {
+      return super.kvComparator.compareRows(left, right);
+    }
+  }
+
+  @Override
+  public boolean seekToLastRow() throws IOException {
+    throw new NotImplementedException("Not implemented");
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java?rev=1546878&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java Mon Dec  2 02:20:35 2013
@@ -0,0 +1,81 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+
+/**
+ * ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to
+ * support reversed scanning.
+ */
+@InterfaceAudience.Private
+class ReversedRegionScannerImpl extends RegionScannerImpl {
+
+  /**
+   * @param scan
+   * @param additionalScanners
+   * @param region
+   * @throws IOException
+   */
+  ReversedRegionScannerImpl(Scan scan,
+      List<KeyValueScanner> additionalScanners, HRegion region)
+      throws IOException {
+    region.super(scan, additionalScanners, region);
+  }
+
+  @Override
+  protected void initializeKVHeap(List<KeyValueScanner> scanners,
+      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
+    this.storeHeap = new ReversedKeyValueHeap(scanners, region.getComparator());
+    if (!joinedScanners.isEmpty()) {
+      this.joinedHeap = new ReversedKeyValueHeap(joinedScanners,
+          region.getComparator());
+    }
+  }
+
+  @Override
+  protected boolean isStopRow(byte[] currentRow, int offset, short length) {
+    return currentRow == null
+        || (super.stopRow != null && region.getComparator().compareRows(
+            stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan);
+  }
+
+  @Override
+  protected boolean nextRow(byte[] currentRow, int offset, short length)
+      throws IOException {
+    assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
+    byte row[] = new byte[length];
+    System.arraycopy(currentRow, offset, row, 0, length);
+    this.storeHeap.seekToPreviousRow(KeyValue.createFirstOnRow(row));
+    resetFilters();
+    // Calling the hook in CP which allows it to do a fast forward
+    if (this.region.getCoprocessorHost() != null) {
+      return this.region.getCoprocessorHost().postScannerFilterRow(this,
+          currentRow);
+    }
+    return true;
+  }
+
+}
\ No newline at end of file

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java?rev=1546878&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java Mon Dec  2 02:20:35 2013
@@ -0,0 +1,145 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * ReversedStoreScanner extends from StoreScanner, and is used to support
+ * reversed scanning.
+ */
+@InterfaceAudience.Private
+class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
+
+  /**
+   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
+   * are not in a compaction.
+   * 
+   * @param store who we scan
+   * @param scanInfo
+   * @param scan the spec
+   * @param columns which columns we are scanning
+   * @throws IOException
+   */
+  ReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+      NavigableSet<byte[]> columns, long readPt)
+      throws IOException {
+    super(store, scanInfo, scan, columns, readPt);
+  }
+
+  /** Constructor for testing. */
+  ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
+      final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners)
+      throws IOException {
+    super(scan, scanInfo, scanType, columns, scanners,
+        HConstants.LATEST_TIMESTAMP);
+  }
+
+  @Override
+  protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
+      KVComparator comparator) throws IOException {
+    // Combine all seeked scanners with a heap
+    heap = new ReversedKeyValueHeap(scanners, comparator);
+  }
+
+  @Override
+  protected void seekScanners(List<? extends KeyValueScanner> scanners,
+      KeyValue seekKey, boolean isLazy, boolean isParallelSeek)
+      throws IOException {
+    // Seek all scanners to the start of the Row (or if the exact matching row
+    // key does not exist, then to the start of the previous matching Row).
+    if (seekKey.matchingRow(HConstants.EMPTY_START_ROW)) {
+      for (KeyValueScanner scanner : scanners) {
+        scanner.seekToLastRow();
+      }
+    } else {
+      for (KeyValueScanner scanner : scanners) {
+        scanner.backwardSeek(seekKey);
+      }
+    }
+  }
+
+  @Override
+  protected boolean seekToNextRow(KeyValue kv) throws IOException {
+    return seekToPreviousRow(kv);
+  }
+
+  /**
+   * Do a backwardSeek in a reversed StoreScanner(scan backward)
+   */
+  @Override
+  protected boolean seekAsDirection(KeyValue kv) throws IOException {
+    return backwardSeek(kv);
+  }
+
+  @Override
+  protected void checkScanOrder(KeyValue prevKV, KeyValue kv,
+      KeyValue.KVComparator comparator) throws IOException {
+    // Check that the heap gives us KVs in an increasing order for same row and
+    // decreasing order for different rows.
+    assert prevKV == null || comparator == null || comparator.compareRows(kv, prevKV) < 0
+        || (comparator.matchingRows(kv, prevKV) && comparator.compare(kv,
+            prevKV) >= 0) : "Key " + prevKV
+        + " followed by a " + "error order key " + kv + " in cf " + store
+        + " in reversed scan";
+  }
+
+  @Override
+  public boolean reseek(KeyValue kv) throws IOException {
+    throw new IllegalStateException(
+        "reseek cannot be called on ReversedStoreScanner");
+  }
+
+  @Override
+  public boolean seek(KeyValue key) throws IOException {
+    throw new IllegalStateException(
+        "seek cannot be called on ReversedStoreScanner");
+  }
+
+  @Override
+  public boolean seekToPreviousRow(KeyValue key) throws IOException {
+    lock.lock();
+    try {
+      checkReseek();
+      return this.heap.seekToPreviousRow(key);
+    } finally {
+      lock.unlock();
+    }
+
+  }
+  
+  @Override
+  public boolean backwardSeek(KeyValue key) throws IOException {
+    lock.lock();
+    try {
+      checkReseek();
+      return this.heap.backwardSeek(key);
+    } finally {
+      lock.unlock();
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Mon Dec  2 02:20:35 2013
@@ -132,6 +132,8 @@ public class ScanQueryMatcher {
   
   private final boolean isUserScan;
 
+  private final boolean isReversed;
+
   /**
    * Construct a QueryMatcher for a scan
    * @param scan
@@ -186,6 +188,7 @@ public class ScanQueryMatcher {
       this.columns = new ExplicitColumnTracker(columns,
           scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
     }
+    this.isReversed = scan.isReversed();
   }
 
   /**
@@ -258,15 +261,24 @@ public class ScanQueryMatcher {
 
     int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
         bytes, offset, rowLength);
-    if (ret <= -1) {
-      return MatchCode.DONE;
-    } else if (ret >= 1) {
-      // could optimize this, if necessary?
-      // Could also be called SEEK_TO_CURRENT_ROW, but this
-      // should be rare/never happens.
-      return MatchCode.SEEK_NEXT_ROW;
+    if (!this.isReversed) {
+      if (ret <= -1) {
+        return MatchCode.DONE;
+      } else if (ret >= 1) {
+        // could optimize this, if necessary?
+        // Could also be called SEEK_TO_CURRENT_ROW, but this
+        // should be rare/never happens.
+        return MatchCode.SEEK_NEXT_ROW;
+      }
+    } else {
+      if (ret <= -1) {
+        return MatchCode.SEEK_NEXT_ROW;
+      } else if (ret >= 1) {
+        return MatchCode.DONE;
+      }
     }
 
+
     // optimize case.
     if (this.stickyNextRow)
         return MatchCode.SEEK_NEXT_ROW;
@@ -454,6 +466,14 @@ public class ScanQueryMatcher {
   }
 
   public boolean moreRowsMayExistAfter(KeyValue kv) {
+    if (this.isReversed) {
+      if (rowComparator.compareRows(kv.getBuffer(), kv.getRowOffset(),
+          kv.getRowLength(), stopRow, 0, stopRow.length) <= 0) {
+        return false;
+      } else {
+        return true;
+      }
+    }
     if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
         rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
             kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Mon Dec  2 02:20:35 2013
@@ -1315,11 +1315,18 @@ public class StoreFile {
           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
         return true;
       }
-      KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow());
-      KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow());
-      boolean nonOverLapping = (getComparator().compareFlatKey(this.getFirstKey(),
-        stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
-          || getComparator().compareFlatKey(this.getLastKey(), startKeyValue.getKey()) < 0;
+      KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValue
+          .createFirstOnRow(scan.getStopRow()) : KeyValue.createFirstOnRow(scan
+          .getStartRow());
+      KeyValue largestScanKeyValue = scan.isReversed() ? KeyValue
+          .createLastOnRow(scan.getStartRow()) : KeyValue.createLastOnRow(scan
+          .getStopRow());
+      boolean nonOverLapping = (getComparator().compareFlatKey(
+          this.getFirstKey(), largestScanKeyValue.getKey()) > 0 && !Bytes
+          .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
+              HConstants.EMPTY_END_ROW))
+          || getComparator().compareFlatKey(this.getLastKey(),
+              smallestScanKeyValue.getKey()) < 0;
       return !nonOverLapping;
     }
 
@@ -1426,6 +1433,10 @@ public class StoreFile {
       return reader.getLastKey();
     }
 
+    public byte[] getLastRowKey() {
+      return reader.getLastRowKey();
+    }
+
     public byte[] midkey() throws IOException {
       return reader.midkey();
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Mon Dec  2 02:20:35 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * KeyValueScanner adaptor over the Reader.  It also provides hooks into
@@ -54,6 +55,9 @@ public class StoreFileScanner implements
 
   private boolean enforceMVCC = false;
   private boolean hasMVCCInfo = false;
+  // A flag represents whether could stop skipping KeyValues for MVCC
+  // if have encountered the next row. Only used for reversed scan
+  private boolean stopSkippingKVsIfNextRow = false;
 
   private static AtomicLong seekCount;
 
@@ -186,11 +190,18 @@ public class StoreFileScanner implements
   protected boolean skipKVsNewerThanReadpoint() throws IOException {
     // We want to ignore all key-values that are newer than our current
     // readPoint
+    KeyValue startKV = cur;
     while(enforceMVCC
         && cur != null
         && (cur.getMvccVersion() > readPt)) {
       hfs.next();
       cur = hfs.getKeyValue();
+      if (this.stopSkippingKVsIfNextRow
+          && Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(),
+              cur.getRowLength(), startKV.getBuffer(), startKV.getRowOffset(),
+              startKV.getRowLength()) > 0) {
+        return false;
+      }
     }
 
     if (cur == null) {
@@ -389,4 +400,76 @@ public class StoreFileScanner implements
     return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
         && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
   }
+
+  @Override
+  public boolean seekToPreviousRow(KeyValue key) throws IOException {
+    try {
+      try {
+        KeyValue seekKey = KeyValue.createFirstOnRow(key.getRow());
+        if (seekCount != null) seekCount.incrementAndGet();
+        if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
+            seekKey.getKeyLength())) {
+          close();
+          return false;
+        }
+        KeyValue firstKeyOfPreviousRow = KeyValue.createFirstOnRow(hfs
+            .getKeyValue().getRow());
+
+        if (seekCount != null) seekCount.incrementAndGet();
+        if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
+          close();
+          return false;
+        }
+
+        cur = hfs.getKeyValue();
+        this.stopSkippingKVsIfNextRow = true;
+        boolean resultOfSkipKVs;
+        try {
+          resultOfSkipKVs = skipKVsNewerThanReadpoint();
+        } finally {
+          this.stopSkippingKVsIfNextRow = false;
+        }
+        if (!resultOfSkipKVs
+            || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(),
+                cur.getRowLength(), firstKeyOfPreviousRow.getBuffer(),
+                firstKeyOfPreviousRow.getRowOffset(),
+                firstKeyOfPreviousRow.getRowLength()) > 0) {
+          return seekToPreviousRow(firstKeyOfPreviousRow);
+        }
+
+        return true;
+      } finally {
+        realSeekDone = true;
+      }
+    } catch (IOException ioe) {
+      throw new IOException("Could not seekToPreviousRow " + this + " to key "
+          + key, ioe);
+    }
+  }
+
+  @Override
+  public boolean seekToLastRow() throws IOException {
+    byte[] lastRow = reader.getLastRowKey();
+    if (lastRow == null) {
+      return false;
+    }
+    KeyValue seekKey = KeyValue.createFirstOnRow(lastRow);
+    if (seek(seekKey)) {
+      return true;
+    } else {
+      return seekToPreviousRow(seekKey);
+    }
+  }
+
+  @Override
+  public boolean backwardSeek(KeyValue key) throws IOException {
+    seek(key);
+    if (cur == null
+        || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(),
+            cur.getRowLength(), key.getBuffer(), key.getRowOffset(),
+            key.getRowLength()) > 0) {
+      return seekToPreviousRow(key);
+    }
+    return true;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1546878&r1=1546877&r2=1546878&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Mon Dec  2 02:20:35 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Scan;
@@ -48,7 +49,7 @@ import org.apache.hadoop.hbase.util.Envi
  * into List<KeyValue> for a single row.
  */
 @InterfaceAudience.Private
-public class StoreScanner extends NonLazyKeyValueScanner
+public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(StoreScanner.class);
   protected Store store;
@@ -97,7 +98,7 @@ public class StoreScanner extends NonLaz
 
   // A flag whether use pread for scan
   private boolean scanUsePread = false;
-  private ReentrantLock lock = new ReentrantLock();
+  protected ReentrantLock lock = new ReentrantLock();
   
   private final long readPt;
 
@@ -172,19 +173,8 @@ public class StoreScanner extends NonLaz
     // key does not exist, then to the start of the next matching Row).
     // Always check bloom filter to optimize the top row seek for delete
     // family marker.
-    if (explicitColumnQuery && lazySeekEnabledGlobally) {
-      for (KeyValueScanner scanner : scanners) {
-        scanner.requestSeek(matcher.getStartKey(), false, true);
-      }
-    } else {
-      if (!isParallelSeekEnabled) {
-        for (KeyValueScanner scanner : scanners) {
-          scanner.seek(matcher.getStartKey());
-        }
-      } else {
-        parallelSeek(scanners, matcher.getStartKey());
-      }
-    }
+    seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
+        && lazySeekEnabledGlobally, isParallelSeekEnabled);
 
     // set storeLimit
     this.storeLimit = scan.getMaxResultsPerColumnFamily();
@@ -193,7 +183,7 @@ public class StoreScanner extends NonLaz
     this.storeOffset = scan.getRowOffsetPerColumnFamily();
 
     // Combine all seeked scanners with a heap
-    heap = new KeyValueHeap(scanners, store.getComparator());
+    resetKVHeap(scanners, store.getComparator());
   }
 
   /**
@@ -249,16 +239,10 @@ public class StoreScanner extends NonLaz
     scanners = selectScannersFrom(scanners);
 
     // Seek all scanners to the initial key
-    if (!isParallelSeekEnabled) {
-      for (KeyValueScanner scanner : scanners) {
-        scanner.seek(matcher.getStartKey());
-      }
-    } else {
-      parallelSeek(scanners, matcher.getStartKey());
-    }
+    seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
 
     // Combine all seeked scanners with a heap
-    heap = new KeyValueHeap(scanners, store.getComparator());
+    resetKVHeap(scanners, store.getComparator());
   }
 
   /** Constructor for testing. */
@@ -295,14 +279,8 @@ public class StoreScanner extends NonLaz
       this.store.addChangedReaderObserver(this);
     }
     // Seek all scanners to the initial key
-    if (!isParallelSeekEnabled) {
-      for (KeyValueScanner scanner : scanners) {
-        scanner.seek(matcher.getStartKey());
-      }
-    } else {
-      parallelSeek(scanners, matcher.getStartKey());
-    }
-    heap = new KeyValueHeap(scanners, scanInfo.getComparator());
+    seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
+    resetKVHeap(scanners, scanInfo.getComparator());
   }
 
   /**
@@ -317,6 +295,42 @@ public class StoreScanner extends NonLaz
   }
 
   /**
+   * Seek the specified scanners with the given key
+   * @param scanners
+   * @param seekKey
+   * @param isLazy true if using lazy seek
+   * @param isParallelSeek true if using parallel seek
+   * @throws IOException
+   */
+  protected void seekScanners(List<? extends KeyValueScanner> scanners,
+      KeyValue seekKey, boolean isLazy, boolean isParallelSeek)
+      throws IOException {
+    // Seek all scanners to the start of the Row (or if the exact matching row
+    // key does not exist, then to the start of the next matching Row).
+    // Always check bloom filter to optimize the top row seek for delete
+    // family marker.
+    if (isLazy) {
+      for (KeyValueScanner scanner : scanners) {
+        scanner.requestSeek(seekKey, false, true);
+      }
+    } else {
+      if (!isParallelSeek) {
+        for (KeyValueScanner scanner : scanners) {
+          scanner.seek(seekKey);
+        }
+      } else {
+        parallelSeek(scanners, seekKey);
+      }
+    }
+  }
+
+  protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
+      KVComparator comparator) throws IOException {
+    // Combine all seeked scanners with a heap
+    heap = new KeyValueHeap(scanners, comparator);
+  }
+
+  /**
    * Filters the given list of scanners using Bloom filter, time range, and
    * TTL.
    */
@@ -451,9 +465,7 @@ public class StoreScanner extends NonLaz
     int count = 0;
     LOOP: while((kv = this.heap.peek()) != null) {
       if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
-      // Check that the heap gives us KVs in an increasing order.
-      assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
-        "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
+      checkScanOrder(prevKV, kv, comparator);
       prevKV = kv;
 
       ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
@@ -475,7 +487,7 @@ public class StoreScanner extends NonLaz
             if (!matcher.moreRowsMayExistAfter(kv)) {
               return false;
             }
-            reseek(matcher.getKeyForNextRow(kv));
+            seekToNextRow(kv);
             break LOOP;
           }
 
@@ -490,9 +502,9 @@ public class StoreScanner extends NonLaz
             if (!matcher.moreRowsMayExistAfter(kv)) {
               return false;
             }
-            reseek(matcher.getKeyForNextRow(kv));
+            seekToNextRow(kv);
           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
-            reseek(matcher.getKeyForNextColumn(kv));
+            seekAsDirection(matcher.getKeyForNextColumn(kv));
           } else {
             this.heap.next();
           }
@@ -516,11 +528,11 @@ public class StoreScanner extends NonLaz
             return false;
           }
 
-          reseek(matcher.getKeyForNextRow(kv));
+          seekToNextRow(kv);
           break;
 
         case SEEK_NEXT_COL:
-          reseek(matcher.getKeyForNextColumn(kv));
+          seekAsDirection(matcher.getKeyForNextColumn(kv));
           break;
 
         case SKIP:
@@ -531,7 +543,7 @@ public class StoreScanner extends NonLaz
           // TODO convert resee to Cell?
           KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv));
           if (nextKV != null) {
-            reseek(nextKV);
+            seekAsDirection(nextKV);
           } else {
             heap.next();
           }
@@ -619,16 +631,11 @@ public class StoreScanner extends NonLaz
      * could have done it now by storing the scan object from the constructor */
     List<KeyValueScanner> scanners = getScannersNoCompaction();
 
-    if (!isParallelSeekEnabled) {
-      for (KeyValueScanner scanner : scanners) {
-        scanner.seek(lastTopKey);
-      }
-    } else {
-      parallelSeek(scanners, lastTopKey);
-    }
+    // Seek all scanners to the initial key
+    seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
 
     // Combine all seeked scanners with a heap
-    heap = new KeyValueHeap(scanners, store.getComparator());
+    resetKVHeap(scanners, store.getComparator());
 
     // Reset the state of the Query Matcher and set to top row.
     // Only reset and call setRow if the row changes; avoids confusing the
@@ -648,6 +655,36 @@ public class StoreScanner extends NonLaz
     }
   }
 
+  /**
+   * Check whether scan as expected order
+   * @param prevKV
+   * @param kv
+   * @param comparator
+   * @throws IOException
+   */
+  protected void checkScanOrder(KeyValue prevKV, KeyValue kv,
+      KeyValue.KVComparator comparator) throws IOException {
+    // Check that the heap gives us KVs in an increasing order.
+    assert prevKV == null || comparator == null
+        || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
+        + " followed by a " + "smaller key " + kv + " in cf " + store;
+  }
+
+  protected boolean seekToNextRow(KeyValue kv) throws IOException {
+    return reseek(matcher.getKeyForNextRow(kv));
+  }
+
+  /**
+   * Do a reseek in a normal StoreScanner(scan forward)
+   * @param kv
+   * @return true if scanner has values left, false if end of scanner
+   * @throws IOException
+   */
+  protected boolean seekAsDirection(KeyValue kv)
+      throws IOException {
+    return reseek(kv);
+  }
+
   @Override
   public boolean reseek(KeyValue kv) throws IOException {
     lock.lock();



Mime
View raw message