hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1402214 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apache/hadoop/hbase/protobuf/generated/ main/java/org/apache...
Date Thu, 25 Oct 2012 16:43:23 GMT
Author: stack
Date: Thu Oct 25 16:43:22 2012
New Revision: 1402214

URL: http://svn.apache.org/viewvc?rev=1402214&view=rev
Log:
HBASE-5974 Scanner retry behavior with RPC timeout on next() seems incorrect

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
    hbase/trunk/hbase-server/src/main/protobuf/Client.proto

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java?rev=1402214&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java
(added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java
Thu Oct 25 16:43:22 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Thrown by a RegionServer while doing next() calls on a ResultScanner. Both client and
server
+ * maintain a nextCallSeq and if they do not match, RS will throw this exception.
+ */
+@InterfaceAudience.Private
+public class OutOfOrderScannerNextException extends DoNotRetryIOException {
+
+  private static final long serialVersionUID = 4595751007554273567L;
+
+  public OutOfOrderScannerNextException() {
+    super();
+  }
+
+  public OutOfOrderScannerNextException(String msg) {
+    super(msg);
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1402214&r1=1402213&r2=1402214&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
Thu Oct 25 16:43:22 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
@@ -295,8 +296,9 @@ public class ClientScanner extends Abstr
               }
             } else {
               Throwable cause = e.getCause();
-              if (cause == null || (!(cause instanceof NotServingRegionException)
-                  && !(cause instanceof RegionServerStoppedException))) {
+              if ((cause == null || (!(cause instanceof NotServingRegionException)
+                  && !(cause instanceof RegionServerStoppedException)))
+                  && !(e instanceof OutOfOrderScannerNextException)) {
                 throw e;
               }
             }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1402214&r1=1402213&r2=1402214&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
Thu Oct 25 16:43:22 2012
@@ -67,7 +67,8 @@ public class ScannerCallable extends Ser
 
   // indicate if it is a remote server call
   private boolean isRegionServerRemote = true;
-
+  private long nextCallSeq = 0;
+  
   /**
    * @param connection which connection
    * @param tableName table callable is on
@@ -138,9 +139,19 @@ public class ScannerCallable extends Ser
         try {
           incRPCcallsMetrics();
           ScanRequest request =
-            RequestConverter.buildScanRequest(scannerId, caching, false);
+            RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
           try {
             ScanResponse response = server.scan(null, request);
+            // Client and RS maintain a nextCallSeq number during the scan. Every next()
call
+            // from client to server will increment this number in both sides. Client passes
this
+            // number along with the request and at RS side both the incoming nextCallSeq
and its
+            // nextCallSeq will be matched. In case of a timeout this increment at the client
side
+            // should not happen. If at the server side fetching of next batch of data was
over,
+            // there will be mismatch in the nextCallSeq number. Server will throw
+            // OutOfOrderScannerNextException and then client will reopen the scanner with
startrow
+            // as the last successfully retrieved row.
+            // See HBASE-5974
+            nextCallSeq++;
             long timestamp = System.currentTimeMillis();
             rrs = ResponseConverter.getResults(response);
             if (logScannerActivity) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1402214&r1=1402213&r2=1402214&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
Thu Oct 25 16:43:22 2012
@@ -416,6 +416,25 @@ public final class RequestConverter {
     builder.setScannerId(scannerId);
     return builder.build();
   }
+  
+  /**
+   * Create a protocol buffer ScanRequest for a scanner id
+   * 
+   * @param scannerId
+   * @param numberOfRows
+   * @param closeScanner
+   * @param nextCallSeq
+   * @return a scan request
+   */
+  public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
+      final boolean closeScanner, final long nextCallSeq) {
+    ScanRequest.Builder builder = ScanRequest.newBuilder();
+    builder.setNumberOfRows(numberOfRows);
+    builder.setCloseScanner(closeScanner);
+    builder.setScannerId(scannerId);
+    builder.setNextCallSeq(nextCallSeq);
+    return builder.build();
+  }
 
   /**
    * Create a protocol buffer LockRowRequest

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java?rev=1402214&r1=1402213&r2=1402214&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
Thu Oct 25 16:43:22 2012
@@ -10485,6 +10485,10 @@ public final class ClientProtos {
     // optional bool closeScanner = 5;
     boolean hasCloseScanner();
     boolean getCloseScanner();
+    
+    // optional uint64 nextCallSeq = 6;
+    boolean hasNextCallSeq();
+    long getNextCallSeq();
   }
   public static final class ScanRequest extends
       com.google.protobuf.GeneratedMessage
@@ -10571,12 +10575,23 @@ public final class ClientProtos {
       return closeScanner_;
     }
     
+    // optional uint64 nextCallSeq = 6;
+    public static final int NEXTCALLSEQ_FIELD_NUMBER = 6;
+    private long nextCallSeq_;
+    public boolean hasNextCallSeq() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public long getNextCallSeq() {
+      return nextCallSeq_;
+    }
+    
     private void initFields() {
       region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
       scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
       scannerId_ = 0L;
       numberOfRows_ = 0;
       closeScanner_ = false;
+      nextCallSeq_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -10617,6 +10632,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeBool(5, closeScanner_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeUInt64(6, nextCallSeq_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -10646,6 +10664,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(5, closeScanner_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(6, nextCallSeq_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -10694,6 +10716,11 @@ public final class ClientProtos {
         result = result && (getCloseScanner()
             == other.getCloseScanner());
       }
+      result = result && (hasNextCallSeq() == other.hasNextCallSeq());
+      if (hasNextCallSeq()) {
+        result = result && (getNextCallSeq()
+            == other.getNextCallSeq());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -10723,6 +10750,10 @@ public final class ClientProtos {
         hash = (37 * hash) + CLOSESCANNER_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getCloseScanner());
       }
+      if (hasNextCallSeq()) {
+        hash = (37 * hash) + NEXTCALLSEQ_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getNextCallSeq());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -10859,6 +10890,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000008);
         closeScanner_ = false;
         bitField0_ = (bitField0_ & ~0x00000010);
+        nextCallSeq_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -10925,6 +10958,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000010;
         }
         result.closeScanner_ = closeScanner_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.nextCallSeq_ = nextCallSeq_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -10956,6 +10993,9 @@ public final class ClientProtos {
         if (other.hasCloseScanner()) {
           setCloseScanner(other.getCloseScanner());
         }
+        if (other.hasNextCallSeq()) {
+          setNextCallSeq(other.getNextCallSeq());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -11032,6 +11072,11 @@ public final class ClientProtos {
               closeScanner_ = input.readBool();
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              nextCallSeq_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -11281,6 +11326,27 @@ public final class ClientProtos {
         return this;
       }
       
+      // optional uint64 nextCallSeq = 6;
+      private long nextCallSeq_ ;
+      public boolean hasNextCallSeq() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public long getNextCallSeq() {
+        return nextCallSeq_;
+      }
+      public Builder setNextCallSeq(long value) {
+        bitField0_ |= 0x00000020;
+        nextCallSeq_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearNextCallSeq() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        nextCallSeq_ = 0L;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:ScanRequest)
     }
     
@@ -23715,59 +23781,60 @@ public final class ClientProtos {
       "Range\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVersions\030\007",
       " \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true\022\021\n\tba"
+
       "tchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001(\004\022\022\n\n" +
-      "storeLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 \001(\r\"\203\001" +
+      "storeLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 \001(\r\"\230\001" +
       "\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.RegionSp" +
       "ecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscannerI" +
       "d\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014closeSc"
+
-      "anner\030\005 \001(\010\"\\\n\014ScanResponse\022\027\n\006result\030\001 " +
-      "\003(\0132\007.Result\022\021\n\tscannerId\030\002 \001(\004\022\023\n\013moreR" +
-      "esults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"?\n\016LockRowRequ" +
-      "est\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\013\n",
-      "\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006lockId\030" +
-      "\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowRequest\022 " +
-      "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006lock" +
-      "Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\260\001\n\024BulkLo" +
-      "adHFileRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
-      "pecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkLoadH" +
-      "FileRequest.FamilyPath\022\024\n\014assignSeqNum\030\003" +
-      " \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pa"
+
-      "th\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loa" +
-      "ded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003row\030\001 \002(\014\022\024\n\014proto",
-      "colName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022!\n\010pro" +
-      "perty\030\004 \003(\0132\017.NameStringPair\022!\n\tparamete" +
-      "r\030\005 \003(\0132\016.NameBytesPair\"O\n\026ExecCoprocess" +
-      "orRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" +
-      "ier\022\023\n\004call\030\002 \002(\0132\005.Exec\"8\n\027ExecCoproces" +
-      "sorResponse\022\035\n\005value\030\001 \002(\0132\016.NameBytesPa" +
-      "ir\"_\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(" +
-      "\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002"
+
-      "(\t\022\017\n\007request\030\004 \002(\014\"d\n\031CoprocessorServic" +
-      "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi",
-      "er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" +
-      "l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" +
-      "n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" +
-      "2\016.NameBytesPair\"N\n\013MultiAction\022\027\n\006mutat" +
-      "e\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\022\023\n\004e"
+
-      "xec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult\022\035\n\005valu" +
-      "e\030\001 \001(\0132\016.NameBytesPair\022!\n\texception\030\002 \001" +
-      "(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n\006re" +
-      "gion\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action\030\002" +
-      " \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMu",
-      "ltiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionResu" +
-      "lt2\331\003\n\rClientService\022 \n\003get\022\013.GetRequest" +
-      "\032\014.GetResponse\022)\n\006mutate\022\016.MutateRequest" +
-      "\032\017.MutateResponse\022#\n\004scan\022\014.ScanRequest\032" +
-      "\r.ScanResponse\022,\n\007lockRow\022\017.LockRowReque" +
-      "st\032\020.LockRowResponse\0222\n\tunlockRow\022\021.Unlo" +
-      "ckRowRequest\032\022.UnlockRowResponse\022>\n\rbulk" +
-      "LoadHFile\022\025.BulkLoadHFileRequest\032\026.BulkL" +
-      "oadHFileResponse\022D\n\017execCoprocessor\022\027.Ex" +
-      "ecCoprocessorRequest\032\030.ExecCoprocessorRe",
-      "sponse\022F\n\013execService\022\032.CoprocessorServi" +
-      "ceRequest\032\033.CoprocessorServiceResponse\022&" +
-      "\n\005multi\022\r.MultiRequest\032\016.MultiResponseBB" +
-      "\n*org.apache.hadoop.hbase.protobuf.gener" +
-      "atedB\014ClientProtosH\001\210\001\001\240\001\001"
+      "anner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\\n\014Scan"
+
+      "Response\022\027\n\006result\030\001 \003(\0132\007.Result\022\021\n\tsca" +
+      "nnerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl"
+
+      "\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region\030\001 \002(\013",
+      "2\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n\017LockR" +
+      "owResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\""
+
+      "D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(\0132\020.Re" +
+      "gionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021UnlockR" +
+      "owResponse\"\260\001\n\024BulkLoadHFileRequest\022 \n\006r" +
+      "egion\030\001 \002(\0132\020.RegionSpecifier\0224\n\nfamilyP" +
+      "ath\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyP" +
+      "ath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamilyPath\022" +
+      "\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoa"
+
+      "dHFileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec\022",
+      "\013\n\003row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nme"
+
+      "thodName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.NameS" +
+      "tringPair\022!\n\tparameter\030\005 \003(\0132\016.NameBytes" +
+      "Pair\"O\n\026ExecCoprocessorRequest\022 \n\006region" +
+      "\030\001 \002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132\005"
+
+      ".Exec\"8\n\027ExecCoprocessorResponse\022\035\n\005valu" +
+      "e\030\001 \002(\0132\016.NameBytesPair\"_\n\026CoprocessorSe" +
+      "rviceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 " +
+      "\002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014"
+
+      "\"d\n\031CoprocessorServiceRequest\022 \n\006region\030",
+      "\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027." +
+      "CoprocessorServiceCall\"]\n\032CoprocessorSer" +
+      "viceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSpe" +
+      "cifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"N\n" +
+      "\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n" +
+      "\003get\030\002 \001(\0132\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n"
+
+      "\014ActionResult\022\035\n\005value\030\001 \001(\0132\016.NameBytes" +
+      "Pair\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" +
+      "^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.Region" +
+      "Specifier\022\034\n\006action\030\002 \003(\0132\014.MultiAction\022",
+      "\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006resu" +
+      "lt\030\001 \003(\0132\r.ActionResult2\331\003\n\rClientServic" +
+      "e\022 \n\003get\022\013.GetRequest\032\014.GetResponse\022)\n\006m" +
+      "utate\022\016.MutateRequest\032\017.MutateResponse\022#" +
+      "\n\004scan\022\014.ScanRequest\032\r.ScanResponse\022,\n\007l" +
+      "ockRow\022\017.LockRowRequest\032\020.LockRowRespons" +
+      "e\0222\n\tunlockRow\022\021.UnlockRowRequest\032\022.Unlo" +
+      "ckRowResponse\022>\n\rbulkLoadHFile\022\025.BulkLoa" +
+      "dHFileRequest\032\026.BulkLoadHFileResponse\022D\n" +
+      "\017execCoprocessor\022\027.ExecCoprocessorReques",
+      "t\032\030.ExecCoprocessorResponse\022F\n\013execServi" +
+      "ce\022\032.CoprocessorServiceRequest\032\033.Coproce" +
+      "ssorServiceResponse\022&\n\005multi\022\r.MultiRequ" +
+      "est\032\016.MultiResponseBB\n*org.apache.hadoop" +
+      ".hbase.protobuf.generatedB\014ClientProtosH" +
+      "\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -23875,7 +23942,7 @@ public final class ClientProtos {
           internal_static_ScanRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ScanRequest_descriptor,
-              new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner",
},
+              new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner",
"NextCallSeq", },
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.class,
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.Builder.class);
           internal_static_ScanResponse_descriptor =

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1402214&r1=1402213&r2=1402214&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Thu Oct 25 16:43:22 2012
@@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.RegionMovedException;
 import org.apache.hadoop.hbase.RegionServerStatusProtocol;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -280,8 +281,8 @@ public class  HRegionServer implements C
   // Compactions
   public CompactSplitThread compactSplitThread;
 
-  final ConcurrentHashMap<String, RegionScanner> scanners =
-      new ConcurrentHashMap<String, RegionScanner>();
+  final ConcurrentHashMap<String, RegionScannerHolder> scanners =
+      new ConcurrentHashMap<String, RegionScannerHolder>();
 
   /**
    * Map of regions currently being served by this region server. Key is the
@@ -560,7 +561,11 @@ public class  HRegionServer implements C
 
   RegionScanner getScanner(long scannerId) {
     String scannerIdString = Long.toString(scannerId);
-    return scanners.get(scannerIdString);
+    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
+    if (scannerHolder != null) {
+      return scannerHolder.s;
+    }
+    return null;
   }
 
   /**
@@ -1140,9 +1145,9 @@ public class  HRegionServer implements C
   private void closeAllScanners() {
     // Close any outstanding scanners. Means they'll get an UnknownScanner
     // exception next time they come in.
-    for (Map.Entry<String, RegionScanner> e : this.scanners.entrySet()) {
+    for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
       try {
-        e.getValue().close();
+        e.getValue().s.close();
       } catch (IOException ioe) {
         LOG.warn("Closing scanner " + e.getKey(), ioe);
       }
@@ -2536,8 +2541,9 @@ public class  HRegionServer implements C
     }
 
     public void leaseExpired() {
-      RegionScanner s = scanners.remove(this.scannerName);
-      if (s != null) {
+      RegionScannerHolder rsh = scanners.remove(this.scannerName);
+      if (rsh != null) {
+        RegionScanner s = rsh.s;
         LOG.info("Scanner " + this.scannerName + " lease expired on region "
             + s.getRegionInfo().getRegionNameAsString());
         try {
@@ -2841,7 +2847,7 @@ public class  HRegionServer implements C
       scannerId = rand.nextLong();
       if (scannerId == -1) continue;
       String scannerName = String.valueOf(scannerId);
-      RegionScanner existing = scanners.putIfAbsent(scannerName, s);
+      RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
       if (existing == null) {
         this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
             new ScannerListener(scannerName));
@@ -3073,6 +3079,7 @@ public class  HRegionServer implements C
         int ttl = 0;
         HRegion region = null;
         RegionScanner scanner = null;
+        RegionScannerHolder rsh = null;
         boolean moreResults = true;
         boolean closeScanner = false;
         ScanResponse.Builder builder = ScanResponse.newBuilder();
@@ -3084,11 +3091,12 @@ public class  HRegionServer implements C
           rows = request.getNumberOfRows();
         }
         if (request.hasScannerId()) {
-          scanner = scanners.get(scannerName);
-          if (scanner == null) {
+          rsh = scanners.get(scannerName);
+          if (rsh == null) {
             throw new UnknownScannerException(
               "Name: " + scannerName + ", already closed?");
           }
+          scanner = rsh.s;
           region = getRegion(scanner.getRegionInfo().getRegionName());
         } else {
           region = getRegion(request.getRegion());
@@ -3110,6 +3118,22 @@ public class  HRegionServer implements C
         }
 
         if (rows > 0) {
+          // if nextCallSeq does not match throw Exception straight away. This needs to be
+          // performed even before checking of Lease.
+          // See HBASE-5974
+          if (request.hasNextCallSeq()) {
+            if (rsh == null) {
+              rsh = scanners.get(scannerName);
+            }
+            if (rsh != null) {
+              if (request.getNextCallSeq() != rsh.nextCallSeq) {
+                throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
+                    + " But the nextCallSeq got from client: " + request.getNextCallSeq());
+              }
+              // Increment the nextCallSeq value which is the next expected from client.
+              rsh.nextCallSeq++;
+            }
+          }
           try {
             // Remove lease while its being processed in server; protects against case
             // where processing of request takes > lease expiration time.
@@ -3193,8 +3217,9 @@ public class  HRegionServer implements C
               return builder.build(); // bypass
             }
           }
-          scanner = scanners.remove(scannerName);
-          if (scanner != null) {
+          rsh = scanners.remove(scannerName);
+          if (rsh != null) {
+            scanner = rsh.s;
             scanner.close();
             leases.cancelLease(scannerName);
             if (region != null && region.getCoprocessorHost() != null) {
@@ -4135,4 +4160,16 @@ public class  HRegionServer implements C
   private String getMyEphemeralNodePath() {
     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
   }
+  
+  /**
+   * Holder class which holds the RegionScanner and nextCallSeq together.
+   */
+  private static class RegionScannerHolder {
+    private RegionScanner s;
+    private long nextCallSeq = 0L;
+
+    public RegionScannerHolder(RegionScanner s) {
+      this.s = s;
+    }
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java?rev=1402214&r1=1402213&r2=1402214&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
Thu Oct 25 16:43:22 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 
@@ -82,7 +83,9 @@ public class JVMClusterUtil {
   throws IOException {
     HRegionServer server;
     try {
-      server = hrsc.getConstructor(Configuration.class).newInstance(c);
+      Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
+      ctor.setAccessible(true);
+      server = ctor.newInstance(c);
     } catch (InvocationTargetException ite) {
       Throwable target = ite.getTargetException();
       throw new RuntimeException("Failed construction of RegionServer: " +

Modified: hbase/trunk/hbase-server/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Client.proto?rev=1402214&r1=1402213&r2=1402214&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/Client.proto Thu Oct 25 16:43:22 2012
@@ -209,6 +209,7 @@ message ScanRequest {
   optional uint64 scannerId = 3;
   optional uint32 numberOfRows = 4;
   optional bool closeScanner = 5;
+  optional uint64 nextCallSeq = 6;
 }
 
 /**

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java?rev=1402214&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
(added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
Thu Oct 25 16:43:22 2012
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client
side and
+ * getting retried. This scenario should not result in some data being skipped at RS side.
+ */
+@Category(MediumTests.class)
+public class TestClientScannerRPCTimeout {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static final byte[] VALUE = Bytes.toBytes("testValue");
+  private static final int rpcTimeout = 2 * 1000;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
+    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testScannerNextRPCTimesout() throws Exception {
+    final byte[] TABLE_NAME = Bytes.toBytes("testScannerNextRPCTimesout");
+    HTable ht = TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    byte[] r1 = Bytes.toBytes("row-1");
+    byte[] r2 = Bytes.toBytes("row-2");
+    byte[] r3 = Bytes.toBytes("row-3");
+    putToTable(ht, r1);
+    putToTable(ht, r2);
+    putToTable(ht, r3);
+    RegionServerWithScanTimeout.seqNoToSleepOn = 1;
+    Scan scan = new Scan();
+    scan.setCaching(1);
+    ResultScanner scanner = ht.getScanner(scan);
+    Result result = scanner.next();
+    assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
+    long t1 = System.currentTimeMillis();
+    result = scanner.next();
+    assertTrue((System.currentTimeMillis() - t1) > rpcTimeout);
+    assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
+    RegionServerWithScanTimeout.seqNoToSleepOn = -1;// No need of sleep
+    result = scanner.next();
+    assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
+    scanner.close();
+  }
+
+  private void putToTable(HTable ht, byte[] rowkey) throws IOException {
+    Put put = new Put(rowkey);
+    put.add(FAMILY, QUALIFIER, VALUE);
+    ht.put(put);
+  }
+
+  private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
+    private long tableScannerId;
+    private boolean slept;
+    private static long seqNoToSleepOn = -1;
+
+    public RegionServerWithScanTimeout(Configuration conf) throws IOException, InterruptedException
{
+      super(conf);
+    }
+
+    @Override
+    public ScanResponse scan(final RpcController controller, final ScanRequest request)
+        throws ServiceException {
+      if (request.hasScannerId()) {
+        if (!slept && this.tableScannerId == request.getScannerId()
+            && seqNoToSleepOn == request.getNextCallSeq()) {
+          try {
+            Thread.sleep(rpcTimeout + 500);
+          } catch (InterruptedException e) {
+          }
+          slept = true;
+        }
+        return super.scan(controller, request);
+      } else {
+        ScanResponse scanRes = super.scan(controller, request);
+        String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
+        if (!regionName.contains("-ROOT-") && !regionName.contains(".META.")) {
+          tableScannerId = scanRes.getScannerId();
+        }
+        return scanRes;
+      }
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message