hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r749715 - in /hadoop/hbase/branches/0.19: ./ src/java/org/apache/hadoop/hbase/client/transactional/ src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/regionserver/transactional/
Date Tue, 03 Mar 2009 19:50:19 GMT
Author: stack
Date: Tue Mar  3 19:50:19 2009
New Revision: 749715

URL: http://svn.apache.org/viewvc?rev=749715&view=rev
Log:
HBASE-1233 Transactional fixes: Overly conservative scan read-set, potential CME

Modified:
    hadoop/hbase/branches/0.19/CHANGES.txt
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/package.html
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java

Modified: hadoop/hbase/branches/0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/CHANGES.txt?rev=749715&r1=749714&r2=749715&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19/CHANGES.txt Tue Mar  3 19:50:19 2009
@@ -15,6 +15,8 @@
    HBASE-1142  Cleanup thrift server; remove Text and profuse DEBUG messaging
                (Tim Sell via Stack)
    HBASE-1211  NPE in retries exhausted exception
+   HBASE-1233  Transactional fixes: Overly conservative scan read-set,
+               potential CME (Clint Morgan via Stack)
 
   IMPROVEMENTS
    HBASE-845   HCM.isTableEnabled doesn't really tell if it is, or not

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/package.html?rev=749715&r1=749714&r2=749715&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/package.html
(original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/package.html
Tue Mar  3 19:50:19 2009
@@ -42,18 +42,20 @@
 <i>hbase.regionserver.impl </i> to
 <i>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</i>
 
+<p> 
+The read set claimed by a transactional scanner is determined from the start and
+ end keys which the scanner is opened with. 
+
+
+
 <h3> Known Issues </h3>
 
 Recovery in the face of hregion server failure
 is not fully implemented. Thus, you cannot rely on the transactional
 properties in the face of node failure.
 
-<p> In order to avoid phantom reads on scanners, scanners currently
-claim a <i>write set</i> for all rows in every regions which they scan
-through. This means that if transaction A writes to a region that
-transaction B is scanning, then there is a conflict (only one
-transacton can be committed). This will occur even if the scanner
-never went over the row that was written.
+
+
  
 </body>
 </html>

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java?rev=749715&r1=749714&r2=749715&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java Tue
Mar  3 19:50:19 2009
@@ -88,6 +88,14 @@
     return operator;
   }
   
+  /** Get the filters.
+   * 
+   * @return filters
+   */
+  public Set<RowFilterInterface> getFilters() {
+    return filters;
+  }
+  
   /** Add a filter.
    * 
    * @param filter

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=749715&r1=749714&r2=749715&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
(original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
Tue Mar  3 19:50:19 2009
@@ -31,6 +31,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
 import org.apache.hadoop.hbase.io.BatchOperation;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
@@ -58,40 +63,69 @@
     ABORTED
   }
 
+  /**
+   * Simple container of the range of the scanners we've opened. Used to check
+   * for conflicting writes.
+   */
+  private class ScanRange {
+    private byte[] startRow;
+    private byte[] endRow;
+
+    public ScanRange(byte[] startRow, byte[] endRow) {
+      this.startRow = startRow;
+      this.endRow = endRow;
+    }
+
+    /**
+     * Check if this scan range contains the given key.
+     * 
+     * @param rowKey
+     * @return
+     */
+    public boolean contains(byte[] rowKey) {
+      if (startRow != null && Bytes.compareTo(rowKey, startRow) < 0) {
+        return false;
+      }
+      if (endRow != null && Bytes.compareTo(endRow, rowKey) < 0) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  private final HRegionInfo regionInfo;
   private final long hLogStartSequenceId;
   private final long transactionId;
   private Status status;
   private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
       Bytes.BYTES_COMPARATOR);
   private List<BatchUpdate> writeSet = new LinkedList<BatchUpdate>();
+  private List<ScanRange> scanSet = new LinkedList<ScanRange>();
   private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
   private int startSequenceNumber;
   private Integer sequenceNumber;
-  boolean hasScan = false;
 
-  //TODO: Why don't these methods and the class itself use default access?
-  //      They are only referenced from within this package.
-  
-  public TransactionState(final long transactionId,
-      final long rLogStartSequenceId) {
+  TransactionState(final long transactionId, final long rLogStartSequenceId,
+      HRegionInfo regionInfo) {
     this.transactionId = transactionId;
     this.hLogStartSequenceId = rLogStartSequenceId;
+    this.regionInfo = regionInfo;
     this.status = Status.PENDING;
   }
 
-  public void addRead(final byte[] rowKey) {
+  void addRead(final byte[] rowKey) {
     readSet.add(rowKey);
   }
 
-  public Set<byte[]> getReadSet() {
+  Set<byte[]> getReadSet() {
     return readSet;
   }
 
-  public void addWrite(final BatchUpdate write) {
+  void addWrite(final BatchUpdate write) {
     writeSet.add(write);
   }
 
-  public List<BatchUpdate> getWriteSet() {
+  List<BatchUpdate> getWriteSet() {
     return writeSet;
   }
 
@@ -103,8 +137,8 @@
    * @param timestamp
    * @return
    */
-  public Map<byte[], Cell> localGetFull(final byte[] row,
-      final Set<byte[]> columns, final long timestamp) {
+  Map<byte[], Cell> localGetFull(final byte[] row, final Set<byte[]> columns,
+      final long timestamp) {
     Map<byte[], Cell> results = new TreeMap<byte[], Cell>(
         Bytes.BYTES_COMPARATOR); // Must use the Bytes Conparator because
     for (BatchUpdate b : writeSet) {
@@ -133,8 +167,7 @@
    * @param timestamp
    * @return
    */
-  public Cell[] localGet(final byte[] row, final byte[] column,
-      final long timestamp) {
+  Cell[] localGet(final byte[] row, final byte[] column, final long timestamp) {
     ArrayList<Cell> results = new ArrayList<Cell>();
 
     // Go in reverse order to put newest updates first in list
@@ -158,11 +191,11 @@
         .toArray(new Cell[results.size()]);
   }
 
-  public void addTransactionToCheck(final TransactionState transaction) {
+  void addTransactionToCheck(final TransactionState transaction) {
     transactionsToCheck.add(transaction);
   }
 
-  public boolean hasConflict() {
+  boolean hasConflict() {
     for (TransactionState transactionState : transactionsToCheck) {
       if (hasConflict(transactionState)) {
         return true;
@@ -177,18 +210,22 @@
     }
 
     for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) {
-      if (this.hasScan) {
-        LOG.info("Transaction" + this.toString()
-            + " has a scan read. Meanwile a write occured. "
-            + "Conservitivly reporting conflict");
-        return true;
-      }
-
       if (this.getReadSet().contains(otherUpdate.getRow())) {
-        LOG.trace("Transaction " + this.toString() + " conflicts with "
-            + checkAgainst.toString());
+        LOG.debug("Transaction [" + this.toString()
+            + "] has read which conflicts with [" + checkAgainst.toString()
+            + "]: region [" + regionInfo.getRegionNameAsString() + "], row["
+            + Bytes.toString(otherUpdate.getRow()) + "]");
         return true;
       }
+      for (ScanRange scanRange : this.scanSet) {
+        if (scanRange.contains(otherUpdate.getRow())) {
+          LOG.debug("Transaction [" + this.toString()
+              + "] has scan which conflicts with [" + checkAgainst.toString()
+              + "]: region [" + regionInfo.getRegionNameAsString() + "], row["
+              + Bytes.toString(otherUpdate.getRow()) + "]");
+          return true;
+        }
+      }
     }
     return false;
   }
@@ -198,7 +235,7 @@
    * 
    * @return Return the status.
    */
-  public Status getStatus() {
+  Status getStatus() {
     return status;
   }
 
@@ -207,7 +244,7 @@
    * 
    * @param status The status to set.
    */
-  public void setStatus(final Status status) {
+  void setStatus(final Status status) {
     this.status = status;
   }
 
@@ -216,7 +253,7 @@
    * 
    * @return Return the startSequenceNumber.
    */
-  public int getStartSequenceNumber() {
+  int getStartSequenceNumber() {
     return startSequenceNumber;
   }
 
@@ -225,7 +262,7 @@
    * 
    * @param startSequenceNumber.
    */
-  public void setStartSequenceNumber(final int startSequenceNumber) {
+  void setStartSequenceNumber(final int startSequenceNumber) {
     this.startSequenceNumber = startSequenceNumber;
   }
 
@@ -234,7 +271,7 @@
    * 
    * @return Return the sequenceNumber.
    */
-  public Integer getSequenceNumber() {
+  Integer getSequenceNumber() {
     return sequenceNumber;
   }
 
@@ -243,7 +280,7 @@
    * 
    * @param sequenceNumber The sequenceNumber to set.
    */
-  public void setSequenceNumber(final Integer sequenceNumber) {
+  void setSequenceNumber(final Integer sequenceNumber) {
     this.sequenceNumber = sequenceNumber;
   }
 
@@ -256,6 +293,8 @@
     result.append(status.name());
     result.append(" read Size: ");
     result.append(readSet.size());
+    result.append(" scan Size: ");
+    result.append(scanSet.size());
     result.append(" write Size: ");
     result.append(writeSet.size());
     result.append(" startSQ: ");
@@ -274,7 +313,7 @@
    * 
    * @return Return the transactionId.
    */
-  public long getTransactionId() {
+  long getTransactionId() {
     return transactionId;
   }
 
@@ -283,17 +322,41 @@
    * 
    * @return Return the startSequenceId.
    */
-  public long getHLogStartSequenceId() {
+  long getHLogStartSequenceId() {
     return hLogStartSequenceId;
   }
 
-  /**
-   * Set the hasScan.
-   * 
-   * @param hasScan The hasScan to set.
-   */
-  public void setHasScan(final boolean hasScan) {
-    this.hasScan = hasScan;
+  void addScan(byte[] firstRow, RowFilterInterface filter) {
+    ScanRange scanRange = new ScanRange(firstRow, getEndRow(filter));
+    LOG.trace(String.format(
+        "Adding scan for transcaction [%s], from [%s] to [%s]", transactionId,
+        scanRange.startRow == null ? "null" : Bytes
+            .toString(scanRange.startRow), scanRange.endRow == null ? "null"
+            : Bytes.toString(scanRange.endRow)));
+    scanSet.add(scanRange);
+  }
+
+  private byte[] getEndRow(RowFilterInterface filter) {
+    if (filter instanceof WhileMatchRowFilter) {
+      WhileMatchRowFilter wmrFilter = (WhileMatchRowFilter) filter;
+      if (wmrFilter.getInternalFilter() instanceof StopRowFilter) {
+        StopRowFilter stopFilter = (StopRowFilter) wmrFilter
+            .getInternalFilter();
+        return stopFilter.getStopRowKey();
+      }
+    } else if (filter instanceof RowFilterSet) {
+      RowFilterSet rowFilterSet = (RowFilterSet) filter;
+      if (rowFilterSet.getOperator()
+          .equals(RowFilterSet.Operator.MUST_PASS_ALL)) {
+        for (RowFilterInterface subFilter : rowFilterSet.getFilters()) {
+          byte[] endRow = getEndRow(subFilter);
+          if (endRow != null) {
+            return endRow;
+          }
+        }
+      }
+    }
+    return null;
   }
 
 }

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=749715&r1=749714&r2=749715&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
(original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
Tue Mar  3 19:50:19 2009
@@ -180,14 +180,16 @@
         alias.setStatus(Status.ABORTED);
         retireTransaction(alias);
       }
+      LOG.error("Existing trasaction with id ["+key+"] in region ["+super.getRegionInfo().getRegionNameAsString()+"]");
       throw new IOException("Already exiting transaction id: " + key);
     }
 
     TransactionState state = new TransactionState(transactionId, super.getLog()
-        .getSequenceNumber());
+        .getSequenceNumber(), super.getRegionInfo());
 
-    // Order is important here
-    for (TransactionState commitPending : commitPendingTransactions) {
+    // Order is important here ...
+    List<TransactionState> commitPendingCopy = new LinkedList<TransactionState>(commitPendingTransactions);
+    for (TransactionState commitPending : commitPendingCopy) {
       state.addTransactionToCheck(commitPending);
     }
     state.setStartSequenceNumber(nextSequenceId.get());
@@ -196,6 +198,7 @@
     try {
       transactionLeases.createLease(key, new TransactionLeaseListener(key));
     } catch (LeaseStillHeldException e) {
+      LOG.error("Lease still held for ["+key+"] in region ["+super.getRegionInfo().getRegionNameAsString()+"]");
     
       throw new RuntimeException(e);
     }
     LOG.debug("Begining transaction " + key + " in region "
@@ -337,6 +340,8 @@
   public InternalScanner getScanner(final long transactionId,
       final byte[][] cols, final byte[] firstRow, final long timestamp,
       final RowFilterInterface filter) throws IOException {
+    TransactionState state = getTransactionState(transactionId);
+    state.addScan(firstRow, filter);
     return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
         timestamp, filter));
   }
@@ -578,14 +583,31 @@
       numRemoved++;
     }
 
-    if (numRemoved > 0) {
-      LOG.debug("Removed " + numRemoved
-          + " commited transactions with sequence lower than "
-          + minStartSeqNumber + ". Still have "
-          + commitedTransactionsBySequenceNumber.size() + " left");
-    } else if (commitedTransactionsBySequenceNumber.size() > 0) {
-      LOG.debug("Could not remove any transactions, and still have "
-          + commitedTransactionsBySequenceNumber.size() + " left");
+    if (LOG.isDebugEnabled()) {
+      StringBuilder debugMessage = new StringBuilder();
+      if (numRemoved > 0) {
+        debugMessage.append("Removed ").append(numRemoved).append(
+            " commited transactions");
+
+        if (minStartSeqNumber == Integer.MAX_VALUE) {
+          debugMessage.append("with any sequence number");
+        } else {
+          debugMessage.append("with sequence lower than ").append(
+              minStartSeqNumber).append(".");
+        }
+        if (!commitedTransactionsBySequenceNumber.isEmpty()) {
+          debugMessage.append(" Still have ").append(
+              commitedTransactionsBySequenceNumber.size()).append(" left.");
+        } else {
+          debugMessage.append("None left.");
+        }
+        LOG.debug(debugMessage.toString());
+      } else if (commitedTransactionsBySequenceNumber.size() > 0) {
+        debugMessage.append(
+            "Could not remove any transactions, and still have ").append(
+            commitedTransactionsBySequenceNumber.size()).append(" left");
+        LOG.debug(debugMessage.toString());
+      }
     }
   }
 
@@ -647,9 +669,11 @@
     /**
      * @param transactionId
      * @param scanner
+     * @throws UnknownTransactionException
      */
     public ScannerWrapper(final long transactionId,
-        final InternalScanner scanner) {
+        final InternalScanner scanner) throws UnknownTransactionException {
+
       this.transactionId = transactionId;
       this.scanner = scanner;
     }
@@ -670,10 +694,6 @@
         final SortedMap<byte[], Cell> results) throws IOException {
       boolean result = scanner.next(key, results);
       TransactionState state = getTransactionState(transactionId);
-      state.setHasScan(true);
-      // FIXME, not using row, just claiming read over the whole region. We are
-      // being very conservative on scans to avoid phantom reads.
-      state.addRead(key.getRow());
 
       if (result) {
         Map<byte[], Cell> localWrites = state.localGetFull(key.getRow(), null,



Mime
View raw message