hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r774570 - in /hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase: client/transactional/ ipc/ regionserver/ regionserver/tableindexed/ regionserver/transactional/
Date Wed, 13 May 2009 22:35:55 GMT
Author: stack
Date: Wed May 13 22:35:55 2009
New Revision: 774570

URL: http://svn.apache.org/viewvc?rev=774570&view=rev
Log:
HBASE-1418 Transacitonal improvments and fixes

Modified:
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java Wed May 13 22:35:55 2009
@@ -20,15 +20,19 @@
 package org.apache.hadoop.hbase.client.transactional;
 
 import java.io.IOException;
+import java.util.Iterator;
+
+import javax.transaction.xa.XAResource;
+
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
-import org.apache.hadoop.ipc.RemoteException;
 
 /**
  * Transaction Manager. Responsible for committing transactions.
@@ -39,6 +43,7 @@
 
   private final HConnection connection;
   private final TransactionLogger transactionLogger;
+  private JtaXAResource xAResource;
 
   /**
    * @param conf
@@ -67,26 +72,43 @@
     LOG.debug("Begining transaction " + transactionId);
     return new TransactionState(transactionId);
   }
-
+  
   /**
-   * Try and commit a transaction.
+   * Prepare to commit a transaction.
    * 
    * @param transactionState
+   * @return commitStatusCode (see {@link TransactionalRegionInterface})
    * @throws IOException
    * @throws CommitUnsuccessfulException
    */
-  public void tryCommit(final TransactionState transactionState)
+  public int prepareCommit(final TransactionState transactionState)
       throws CommitUnsuccessfulException, IOException {
-    LOG.debug("atempting to commit trasaction: " + transactionState.toString());
-
+    boolean allReadOnly = true;
     try {
-      for (HRegionLocation location : transactionState
-          .getParticipatingRegions()) {
+      Iterator<HRegionLocation> locationIterator = transactionState.getParticipatingRegions().iterator();
+      while (locationIterator.hasNext()) {
+        HRegionLocation location = locationIterator.next();
         TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
             .getHRegionConnection(location.getServerAddress());
-        boolean canCommit = transactionalRegionServer.commitRequest(location
+        int commitStatus = transactionalRegionServer.commitRequest(location
             .getRegionInfo().getRegionName(), transactionState
             .getTransactionId());
+        boolean canCommit = true;
+        switch (commitStatus) {
+        case TransactionalRegionInterface.COMMIT_OK:
+          allReadOnly = false;
+          break;
+        case TransactionalRegionInterface.COMMIT_OK_READ_ONLY:
+          locationIterator.remove(); // No need to doCommit for read-onlys
+          break;
+        case TransactionalRegionInterface.COMMIT_UNSUCESSFUL:
+          canCommit = false;
+          break;
+        default:
+          throw new CommitUnsuccessfulException(
+              "Unexpected return code from prepareCommit: " + commitStatus);
+        }        
+        
         if (LOG.isTraceEnabled()) {
           LOG.trace("Region ["
               + location.getRegionInfo().getRegionNameAsString() + "] votes "
@@ -100,7 +122,48 @@
           throw new CommitUnsuccessfulException();
         }
       }
+    } catch (Exception e) {
+      LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
+          + "] was unsucsessful", e);
+      // This happens on a NSRE that is triggered by a split
+      // FIXME, but then abort fails
+      try {
+        abort(transactionState);
+      } catch (Exception abortException) {
+        LOG.warn("Exeption durring abort", abortException);
+      }
+      throw new CommitUnsuccessfulException(e);
+    }
+    return allReadOnly ? TransactionalRegionInterface.COMMIT_OK_READ_ONLY : TransactionalRegionInterface.COMMIT_OK;
+  }
 
+  /**
+   * Try and commit a transaction. This does both phases of the 2-phase protocol: prepare and commit.
+   * 
+   * @param transactionState
+   * @throws IOException
+   * @throws CommitUnsuccessfulException
+   */
+  public void tryCommit(final TransactionState transactionState)
+      throws CommitUnsuccessfulException, IOException {
+    long startTime = System.currentTimeMillis();
+    LOG.debug("atempting to commit trasaction: " + transactionState.toString());
+    int status = prepareCommit(transactionState);
+    
+    if (status == TransactionalRegionInterface.COMMIT_OK) {
+      doCommit(transactionState);
+    }
+    LOG.debug("Committed transaction ["+transactionState.getTransactionId()+"] in ["+((System.currentTimeMillis()-startTime))+"]ms");
+  }
+
+  /** Do the commit. This is the 2nd phase of the 2-phase protocol.
+   * 
+   * @param transactionState
+   * @throws CommitUnsuccessfulException
+   */
+  public void doCommit(final TransactionState transactionState)
+      throws CommitUnsuccessfulException{
+    try {
       LOG.debug("Commiting [" + transactionState.getTransactionId() + "]");
 
       transactionLogger.setStatusForTransaction(transactionState
@@ -113,13 +176,19 @@
         transactionalRegionServer.commit(location.getRegionInfo()
             .getRegionName(), transactionState.getTransactionId());
       }
-    } catch (RemoteException e) {
+    } catch (Exception e) {
       LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
           + "] was unsucsessful", e);
-      // FIXME, think about the what ifs
+      // This happens on a NSRE that is triggered by a split
+      // FIXME, but then abort fails
+      try {
+        abort(transactionState);
+      } catch (Exception abortException) {
+        LOG.warn("Exeption durring abort", abortException);
+      }
       throw new CommitUnsuccessfulException(e);
     }
-    // Tran log can be deleted now ...
+    // TODO: Transaction log can be deleted now ...
   }
 
   /**
@@ -141,12 +210,33 @@
       if (locationToIgnore != null && location.equals(locationToIgnore)) {
         continue;
       }
+      try {
+        TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+            .getHRegionConnection(location.getServerAddress());
 
-      TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
-          .getHRegionConnection(location.getServerAddress());
-
-      transactionalRegionServer.abort(location.getRegionInfo().getRegionName(),
-          transactionState.getTransactionId());
+        transactionalRegionServer.abort(location.getRegionInfo()
+            .getRegionName(), transactionState.getTransactionId());
+      } catch (UnknownTransactionException e) {
+        LOG
+            .debug("Got unknown transaciton exception durring abort. Transaction: ["
+                + transactionState.getTransactionId()
+                + "], region: ["
+                + location.getRegionInfo().getRegionNameAsString()
+                + "]. Ignoring.");
+      } catch (NotServingRegionException e) {
+        LOG
+            .debug("Got NSRE durring abort. Transaction: ["
+                + transactionState.getTransactionId() + "], region: ["
+                + location.getRegionInfo().getRegionNameAsString()
+                + "]. Ignoring.");
+      }
+    }
+  }
+  
+  public synchronized JtaXAResource getXAResource() {
+    if (xAResource == null){
+      xAResource = new JtaXAResource(this);
     }
+    return xAResource;
   }
 }

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java Wed May 13 22:35:55 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * Table with transactional support.
@@ -47,7 +48,7 @@
    */
   public TransactionalTable(final HBaseConfiguration conf,
       final String tableName) throws IOException {
-    super(conf, tableName);
+    this(conf, Bytes.toBytes(tableName));
   }
 
   /**
@@ -386,7 +387,7 @@
    * @param batchUpdate
    * @throws IOException
    */
-  public synchronized void commit(final TransactionState transactionState,
+  public void commit(final TransactionState transactionState,
       final BatchUpdate batchUpdate) throws IOException {
     super.getConnection().getRegionServerWithRetries(
         new TransactionalServerCallable<Boolean>(super.getConnection(), super

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java Wed May 13 22:35:55 2009
@@ -27,12 +27,20 @@
 /**
  * Interface for transactional region servers.
  * 
- * <p>NOTE: if you change the interface, you must change the RPC version
- * number in HBaseRPCProtocolVersion
+ * <p>
+ * NOTE: if you change the interface, you must change the RPC version number in
+ * HBaseRPCProtocolVersion
  * 
  */
 public interface TransactionalRegionInterface extends HRegionInterface {
 
+  /** Status code representing a transaction that can be committed. */
+  int COMMIT_OK = 1;
+  /** Status code representing a read-only transaction that can be committed. */
+  int COMMIT_OK_READ_ONLY = 2;
+  /** Status code representing a transaction that cannot be committed. */
+  int COMMIT_UNSUCESSFUL = 3;
+  
   /**
    * Sent to initiate a transaction.
    * 
@@ -40,13 +48,13 @@
    * @param regionName name of region
    * @throws IOException
    */
-  public void beginTransaction(long transactionId, final byte[] regionName)
+  void beginTransaction(long transactionId, final byte[] regionName)
       throws IOException;
 
   /**
    * Retrieve a single value from the specified region for the specified row and
    * column keys
-   *
+   * 
    * @param transactionId
    * @param regionName name of region
    * @param row row key
@@ -54,8 +62,8 @@
    * @return alue for that region/row/column
    * @throws IOException
    */
-  public Cell get(long transactionId, final byte[] regionName,
-      final byte[] row, final byte[] column) throws IOException;
+  Cell get(long transactionId, final byte[] regionName, final byte[] row,
+      final byte[] column) throws IOException;
 
   /**
    * Get the specified number of versions of the specified row and column
@@ -68,9 +76,8 @@
    * @return array of values
    * @throws IOException
    */
-  public Cell[] get(long transactionId, final byte[] regionName,
-      final byte[] row, final byte[] column, final int numVersions)
-      throws IOException;
+  Cell[] get(long transactionId, final byte[] regionName, final byte[] row,
+      final byte[] column, final int numVersions) throws IOException;
 
   /**
    * Get the specified number of versions of the specified row and column with
@@ -85,9 +92,9 @@
    * @return array of values
    * @throws IOException
    */
-  public Cell[] get(long transactionId, final byte[] regionName,
-      final byte[] row, final byte[] column, final long timestamp,
-      final int numVersions) throws IOException;
+  Cell[] get(long transactionId, final byte[] regionName, final byte[] row,
+      final byte[] column, final long timestamp, final int numVersions)
+      throws IOException;
 
   /**
    * Get all the data for the specified row at a given timestamp
@@ -99,7 +106,7 @@
    * @return map of values
    * @throws IOException
    */
-  public RowResult getRow(long transactionId, final byte[] regionName,
+  RowResult getRow(long transactionId, final byte[] regionName,
       final byte[] row, final long ts) throws IOException;
 
   /**
@@ -113,7 +120,7 @@
    * @return map of values
    * @throws IOException
    */
-  public RowResult getRow(long transactionId, final byte[] regionName,
+  RowResult getRow(long transactionId, final byte[] regionName,
       final byte[] row, final byte[][] columns, final long ts)
       throws IOException;
 
@@ -127,7 +134,7 @@
    * @return map of values
    * @throws IOException
    */
-  public RowResult getRow(long transactionId, final byte[] regionName,
+  RowResult getRow(long transactionId, final byte[] regionName,
       final byte[] row, final byte[][] columns) throws IOException;
 
   /**
@@ -140,7 +147,7 @@
    * @param timestamp Delete all entries that have this timestamp or older
    * @throws IOException
    */
-  public void deleteAll(long transactionId, byte[] regionName, byte[] row,
+  void deleteAll(long transactionId, byte[] regionName, byte[] row,
       long timestamp) throws IOException;
 
   /**
@@ -160,7 +167,7 @@
    * @return scannerId scanner identifier used in other calls
    * @throws IOException
    */
-  public long openScanner(final long transactionId, final byte[] regionName,
+  long openScanner(final long transactionId, final byte[] regionName,
       final byte[][] columns, final byte[] startRow, long timestamp,
       RowFilterInterface filter) throws IOException;
 
@@ -172,37 +179,47 @@
    * @param b BatchUpdate
    * @throws IOException
    */
-  public void batchUpdate(long transactionId, final byte[] regionName,
+  void batchUpdate(long transactionId, final byte[] regionName,
       final BatchUpdate b) throws IOException;
 
   /**
    * Ask if we can commit the given transaction.
-   *
+   * 
    * @param regionName
    * @param transactionId
-   * @return true if we can commit
+   * @return status of COMMIT_OK, COMMIT_READ_ONLY, or COMMIT_UNSUSESSFULL
    * @throws IOException
    */
-  public boolean commitRequest(final byte[] regionName, long transactionId)
+  int commitRequest(final byte[] regionName, long transactionId)
       throws IOException;
 
   /**
-   * Commit the transaction.
-   *
+   * Try to commit the given transaction. This is used when there is only one
+   * participating region.
+   * 
    * @param regionName
    * @param transactionId
+   * @return true if committed
    * @throws IOException
    */
-  public void commit(final byte[] regionName, long transactionId)
+  boolean commitIfPossible(final byte[] regionName, long transactionId)
       throws IOException;
 
   /**
+   * Commit the transaction.
+   * 
+   * @param regionName
+   * @param transactionId
+   * @throws IOException
+   */
+  void commit(final byte[] regionName, long transactionId) throws IOException;
+
+  /**
    * Abort the transaction.
-   *
+   * 
    * @param regionName
    * @param transactionId
    * @throws IOException
    */
-  public void abort(final byte[] regionName, long transactionId)
-      throws IOException;
+  void abort(final byte[] regionName, long transactionId) throws IOException;
 }

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed May 13 22:35:55 2009
@@ -379,7 +379,8 @@
    * 
    * @throws IOException
    */
-  List<HStoreFile> close(boolean abort) throws IOException {
+  public List<HStoreFile> close(final boolean abort) throws IOException {
+
     if (isClosed()) {
       LOG.warn("region " + this + " already closed");
       return null;
@@ -533,6 +534,7 @@
    * @throws IOException
    */
   HRegion[] splitRegion(final byte [] midKey) throws IOException {
+    prepareToSplit();
     synchronized (splitLock) {
       if (closed.get()) {
         return null;
@@ -628,6 +630,10 @@
     }
   }
   
+  protected void prepareToSplit() {
+    // nothing
+  }
+  
   /*
    * @param dir
    * @return compaction directory for the passed in <code>dir</code>

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed May 13 22:35:55 2009
@@ -1395,7 +1395,7 @@
     getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri));
   }
 
-  void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
+  protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
   throws IOException {
     HRegion region = this.removeFromOnlineRegions(hri);
     if (region != null) {

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java Wed May 13 22:35:55 2009
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Leases;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
 import org.apache.hadoop.hbase.client.tableindexed.IndexedTable;
@@ -59,8 +60,8 @@
 
   public IndexedRegion(final Path basedir, final HLog log, final FileSystem fs,
       final HBaseConfiguration conf, final HRegionInfo regionInfo,
-      final FlushRequester flushListener) {
-    super(basedir, log, fs, conf, regionInfo, flushListener);
+      final FlushRequester flushListener, Leases transactionLeases) {
+    super(basedir, log, fs, conf, regionInfo, flushListener, transactionLeases);
     this.conf = conf;
   }
 

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java Wed May 13 22:35:55 2009
@@ -62,7 +62,7 @@
       throws IOException {
     HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super
         .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
-        .getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
+        .getFileSystem(), super.conf, regionInfo, super.getFlushRequester(), super.getLeases());
     r.initialize(null, new Progressable() {
       public void progress() {
         addProcessingMessage(regionInfo);

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Wed May 13 22:35:55 2009
@@ -31,7 +31,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ColumnNameParseException;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.filter.RowFilterSet;
 import org.apache.hadoop.hbase.filter.StopRowFilter;
@@ -91,6 +93,13 @@
       }
       return true;
     }
+    
+    @Override
+    public String toString() {
+      return "startRow: "
+          + (startRow == null ? "null" : Bytes.toString(startRow))
+          + ", endRow: " + (endRow == null ? "null" : Bytes.toString(endRow));
+    }
   }
 
   private final HRegionInfo regionInfo;
@@ -104,6 +113,7 @@
   private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
   private int startSequenceNumber;
   private Integer sequenceNumber;
+  private int commitPendingWaits = 0;
 
   TransactionState(final long transactionId, final long rLogStartSequenceId,
       HRegionInfo regionInfo) {
@@ -140,7 +150,7 @@
   Map<byte[], Cell> localGetFull(final byte[] row, final Set<byte[]> columns,
       final long timestamp) {
     Map<byte[], Cell> results = new TreeMap<byte[], Cell>(
-        Bytes.BYTES_COMPARATOR); // Must use the Bytes Conparator because
+        Bytes.BYTES_COMPARATOR); 
     for (BatchUpdate b : writeSet) {
       if (!Bytes.equals(row, b.getRow())) {
         continue;
@@ -150,7 +160,7 @@
       }
       for (BatchOperation op : b) {
         if (!op.isPut()
-            || (columns != null && !columns.contains(op.getColumn()))) {
+            || (columns != null && !columnInColumns(op.getColumn(), columns))) {
           continue;
         }
         results.put(op.getColumn(), new Cell(op.getValue(), b.getTimestamp()));
@@ -159,6 +169,14 @@
     return results.size() == 0 ? null : results;
   }
 
+  private boolean columnInColumns(byte [] column, Set<byte []> columns) {
+    try {
+      return columns.contains(column) || columns.contains(Bytes.add(HStoreKey.getFamily(column), Bytes.toBytes(":")));
+    } catch (ColumnNameParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
   /**
    * Get from the writeSet.
    * 
@@ -221,7 +239,8 @@
         if (scanRange.contains(otherUpdate.getRow())) {
           LOG.debug("Transaction [" + this.toString()
               + "] has scan which conflicts with [" + checkAgainst.toString()
-              + "]: region [" + regionInfo.getRegionNameAsString() + "], row["
+              + "]: region [" + regionInfo.getRegionNameAsString() + "], scanRange[" +
+              scanRange.toString()+"] ,row["
               + Bytes.toString(otherUpdate.getRow()) + "]");
           return true;
         }
@@ -358,5 +377,13 @@
     }
     return null;
   }
+  
+  int getCommitPendingWaits() {
+    return commitPendingWaits;
+  }
+  
+  void incrementCommitPendingWaits() {
+    this.commitPendingWaits++;
+  }
 
 }

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Wed May 13 22:35:55 2009
@@ -48,10 +48,13 @@
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HLog;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -75,13 +78,11 @@
  */
 public class TransactionalRegion extends HRegion {
 
-  private static final String LEASE_TIME = "hbase.transaction.leaseTime";
-  private static final int DEFAULT_LEASE_TIME = 60 * 1000;
-  private static final int LEASE_CHECK_FREQUENCY = 1000;
-  
   private static final String OLD_TRANSACTION_FLUSH = "hbase.transaction.flush";
-  private static final int DEFAULT_OLD_TRANSACTION_FLUSH = 100; // Do a flush if we have this many old transactions..
-  
+  private static final int DEFAULT_OLD_TRANSACTION_FLUSH = 100; // Do a flush if
+  // we have this
+  // many old
+  // transactions..
 
   private static final Log LOG = LogFactory.getLog(TransactionalRegion.class);
 
@@ -97,11 +98,11 @@
   private Set<TransactionState> commitPendingTransactions = Collections
       .synchronizedSet(new HashSet<TransactionState>());
 
-  private final Leases transactionLeases;
   private AtomicInteger nextSequenceId = new AtomicInteger(0);
   private Object commitCheckLock = new Object();
   private TransactionalHLogManager logManager;
   private final int oldTransactionFlushTrigger;
+  private final Leases transactionLeases;
 
   /**
    * @param basedir
@@ -113,12 +114,13 @@
    */
   public TransactionalRegion(final Path basedir, final HLog log,
       final FileSystem fs, final HBaseConfiguration conf,
-      final HRegionInfo regionInfo, final FlushRequester flushListener) {
+      final HRegionInfo regionInfo, final FlushRequester flushListener,
+      final Leases transactionalLeases) {
     super(basedir, log, fs, conf, regionInfo, flushListener);
-    transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME),
-        LEASE_CHECK_FREQUENCY);
     logManager = new TransactionalHLogManager(this);
-    oldTransactionFlushTrigger = conf.getInt(OLD_TRANSACTION_FLUSH, DEFAULT_OLD_TRANSACTION_FLUSH);
+    oldTransactionFlushTrigger = conf.getInt(OLD_TRANSACTION_FLUSH,
+        DEFAULT_OLD_TRANSACTION_FLUSH);
+    this.transactionLeases = transactionalLeases;
   }
 
   @Override
@@ -160,8 +162,14 @@
    */
   @Override
   protected long getCompleteCacheFlushSequenceId(final long currentSequenceId) {
+    LinkedList<TransactionState> transactionStates;
+    synchronized (transactionsById) {
+      transactionStates = new LinkedList<TransactionState>(transactionsById
+          .values());
+    }
+
     long minPendingStartSequenceId = currentSequenceId;
-    for (TransactionState transactionState : transactionsById.values()) {
+    for (TransactionState transactionState : transactionStates) {
       minPendingStartSequenceId = Math.min(minPendingStartSequenceId,
           transactionState.getHLogStartSequenceId());
     }
@@ -173,6 +181,7 @@
    * @throws IOException
    */
   public void beginTransaction(final long transactionId) throws IOException {
+    checkClosing();
     String key = String.valueOf(transactionId);
     if (transactionsById.get(key) != null) {
       TransactionState alias = getTransactionState(transactionId);
@@ -180,7 +189,8 @@
         alias.setStatus(Status.ABORTED);
         retireTransaction(alias);
       }
-      LOG.error("Existing trasaction with id ["+key+"] in region ["+super.getRegionInfo().getRegionNameAsString()+"]");
+      LOG.error("Existing trasaction with id [" + key + "] in region ["
+          + super.getRegionInfo().getRegionNameAsString() + "]");
       throw new IOException("Already exiting transaction id: " + key);
     }
 
@@ -188,26 +198,35 @@
         .getSequenceNumber(), super.getRegionInfo());
 
     // Order is important here ...
-    List<TransactionState> commitPendingCopy = new LinkedList<TransactionState>(commitPendingTransactions);
+    List<TransactionState> commitPendingCopy = new LinkedList<TransactionState>(
+        commitPendingTransactions);
     for (TransactionState commitPending : commitPendingCopy) {
       state.addTransactionToCheck(commitPending);
     }
     state.setStartSequenceNumber(nextSequenceId.get());
 
-    transactionsById.put(String.valueOf(key), state);
+    synchronized (transactionsById) {
+      transactionsById.put(key, state);
+    }
     try {
-      transactionLeases.createLease(key, new TransactionLeaseListener(key));
+      transactionLeases.createLease(getLeaseId(transactionId),
+          new TransactionLeaseListener(key));
     } catch (LeaseStillHeldException e) {
-      LOG.error("Lease still held for ["+key+"] in region ["+super.getRegionInfo().getRegionNameAsString()+"]");      
+      LOG.error("Lease still held for [" + key + "] in region ["
+          + super.getRegionInfo().getRegionNameAsString() + "]");
       throw new RuntimeException(e);
     }
     LOG.debug("Begining transaction " + key + " in region "
         + super.getRegionInfo().getRegionNameAsString());
     logManager.writeStartToLog(transactionId);
-    
+
     maybeTriggerOldTransactionFlush();
   }
 
+  private String getLeaseId(long transactionId) {
+    return super.getRegionInfo().getRegionNameAsString() + transactionId;
+  }
+
   /**
    * Fetch a single data item.
    * 
@@ -219,6 +238,7 @@
    */
   public Cell get(final long transactionId, final byte[] row,
       final byte[] column) throws IOException {
+    checkClosing();
     Cell[] results = get(transactionId, row, column, 1);
     return (results == null || results.length == 0) ? null : results[0];
   }
@@ -235,6 +255,7 @@
    */
   public Cell[] get(final long transactionId, final byte[] row,
       final byte[] column, final int numVersions) throws IOException {
+    checkClosing();
     return get(transactionId, row, column, Long.MAX_VALUE, numVersions);
   }
 
@@ -252,6 +273,8 @@
   public Cell[] get(final long transactionId, final byte[] row,
       final byte[] column, final long timestamp, final int numVersions)
       throws IOException {
+    checkClosing();
+
     TransactionState state = getTransactionState(transactionId);
 
     state.addRead(row);
@@ -294,6 +317,7 @@
    * @return Map<columnName, Cell> values
    * @throws IOException
    */
+
   public Map<byte[], Cell> getFull(final long transactionId, final byte[] row,
       final Set<byte[]> columns, final long ts) throws IOException {
     TransactionState state = getTransactionState(transactionId);
@@ -312,7 +336,8 @@
         LOG.trace("cell: " + Bytes.toString(entry.getValue().getValue()));
       }
 
-      Map<byte[], Cell> internalResults = getFull(row, columns, ts, 1, null);
+      HbaseMapWritable<byte[], Cell> internalResults = getFull(row, columns,
+          ts, 1, null);
       internalResults.putAll(localCells);
       return internalResults;
     }
@@ -340,6 +365,8 @@
   public InternalScanner getScanner(final long transactionId,
       final byte[][] cols, final byte[] firstRow, final long timestamp,
       final RowFilterInterface filter) throws IOException {
+    checkClosing();
+
     TransactionState state = getTransactionState(transactionId);
     state.addScan(firstRow, filter);
     return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
@@ -355,6 +382,8 @@
    */
   public void batchUpdate(final long transactionId, final BatchUpdate b)
       throws IOException {
+    checkClosing();
+
     TransactionState state = getTransactionState(transactionId);
     state.addWrite(b);
     logManager.writeUpdateToLog(transactionId, b);
@@ -371,18 +400,22 @@
    */
   public void deleteAll(final long transactionId, final byte[] row,
       final long timestamp) throws IOException {
+    checkClosing();
+
     TransactionState state = getTransactionState(transactionId);
     long now = System.currentTimeMillis();
 
+
     for (HStore store : super.stores.values()) {
       List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
           ALL_VERSIONS, now, null);
+
       BatchUpdate deleteUpdate = new BatchUpdate(row, timestamp);
 
       for (HStoreKey key : keys) {
         deleteUpdate.delete(key.getColumn());
       }
-      
+
       state.addWrite(deleteUpdate);
       logManager.writeUpdateToLog(transactionId, deleteUpdate);
 
@@ -392,39 +425,61 @@
 
   /**
    * @param transactionId
-   * @return true if commit is successful
+   * @return TransactionRegionInterface commit code
    * @throws IOException
    */
-  public boolean commitRequest(final long transactionId) throws IOException {
+  public int commitRequest(final long transactionId) throws IOException {
+    checkClosing();
+
     synchronized (commitCheckLock) {
       TransactionState state = getTransactionState(transactionId);
       if (state == null) {
-        return false;
+        return TransactionalRegionInterface.COMMIT_UNSUCESSFUL;
       }
 
       if (hasConflict(state)) {
         state.setStatus(Status.ABORTED);
         retireTransaction(state);
-        return false;
+        return TransactionalRegionInterface.COMMIT_UNSUCESSFUL;
       }
 
       // No conflicts, we can commit.
       LOG.trace("No conflicts for transaction " + transactionId
           + " found in region " + super.getRegionInfo().getRegionNameAsString()
           + ". Voting for commit");
-      state.setStatus(Status.COMMIT_PENDING);
 
       // If there are writes we must keep record off the transaction
       if (state.getWriteSet().size() > 0) {
         // Order is important
+        state.setStatus(Status.COMMIT_PENDING);
         commitPendingTransactions.add(state);
         state.setSequenceNumber(nextSequenceId.getAndIncrement());
         commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(),
             state);
+        return TransactionalRegionInterface.COMMIT_OK;
       }
+      // Otherwise we were read-only and commitable, so we can forget it.
+      state.setStatus(Status.COMMITED);
+      retireTransaction(state);
+      return TransactionalRegionInterface.COMMIT_OK_READ_ONLY;
+    }
+  }
 
+  /**
+   * @param transactionId
+   * @return true if commit is successful
+   * @throws IOException
+   */
+  public boolean commitIfPossible(final long transactionId) throws IOException {
+    int status = commitRequest(transactionId);
+
+    if (status == TransactionalRegionInterface.COMMIT_OK) {
+      commit(transactionId);
+      return true;
+    } else if (status == TransactionalRegionInterface.COMMIT_OK_READ_ONLY) {
       return true;
     }
+    return false;
   }
 
   private boolean hasConflict(final TransactionState state) {
@@ -447,6 +502,7 @@
    * @throws IOException
    */
   public void commit(final long transactionId) throws IOException {
+    // Not checking closing...
     TransactionState state;
     try {
       state = getTransactionState(transactionId);
@@ -473,11 +529,14 @@
    * @throws IOException
    */
   public void abort(final long transactionId) throws IOException {
+    // Not checking closing...
     TransactionState state;
     try {
       state = getTransactionState(transactionId);
     } catch (UnknownTransactionException e) {
-      LOG.error("Asked to abort unknown transaction: " + transactionId);
+      LOG.info("Asked to abort unknown transaction [" + transactionId
+          + "] in region [" + getRegionInfo().getRegionNameAsString()
+          + "], ignoring");
       return;
     }
 
@@ -520,12 +579,64 @@
     retireTransaction(state);
   }
 
+  @Override
+  public List<HStoreFile> close(boolean abort) throws IOException {
+    prepareToClose();
+    if (!commitPendingTransactions.isEmpty()) {
+      // FIXME, better way to handle?
+      LOG.warn("Closing transactional region ["
+          + getRegionInfo().getRegionNameAsString() + "], but still have ["
+          + commitPendingTransactions.size()
+          + "] transactions  that are pending commit");
+    }
+    return super.close(abort);
+  }
+
+  @Override
+  protected void prepareToSplit() {
+    prepareToClose();
+  }
+
+  boolean closing = false;
+
+  /**
+   * Get ready to close.
+   * 
+   */
+  void prepareToClose() {
+    LOG.info("Preparing to close region "
+        + getRegionInfo().getRegionNameAsString());
+    closing = true;
+
+    while (!commitPendingTransactions.isEmpty()) {
+      LOG.info("Preparing to closing transactional region ["
+          + getRegionInfo().getRegionNameAsString() + "], but still have ["
+          + commitPendingTransactions.size()
+          + "] transactions that are pending commit. Sleeping");
+      for (TransactionState s : commitPendingTransactions) {
+        LOG.info(s.toString());
+      }
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+
+    }
+  }
+
+  private void checkClosing() throws IOException {
+    if (closing) {
+      throw new IOException("closing region, no more transaction allowed");
+    }
+  }
+
   // Cancel leases, and removed from lease lookup. This transaction may still
   // live in commitedTransactionsBySequenceNumber and commitPendingTransactions
   private void retireTransaction(final TransactionState state) {
     String key = String.valueOf(state.getTransactionId());
     try {
-      transactionLeases.cancelLease(key);
+      transactionLeases.cancelLease(getLeaseId(state.getTransactionId()));
     } catch (LeaseException e) {
       // Ignore
     }
@@ -541,12 +652,14 @@
     state = transactionsById.get(key);
 
     if (state == null) {
-      LOG.trace("Unknown transaction: " + key);
-      throw new UnknownTransactionException(key);
+      LOG.debug("Unknown transaction: [" + key + "], region: ["
+          + getRegionInfo().getRegionNameAsString() + "]");
+      throw new UnknownTransactionException("transaction: [" + key
+          + "], region: [" + getRegionInfo().getRegionNameAsString() + "]");
     }
 
     try {
-      transactionLeases.renewLease(key);
+      transactionLeases.renewLease(getLeaseId(transactionId));
     } catch (LeaseException e) {
       throw new RuntimeException(e);
     }
@@ -555,11 +668,11 @@
   }
 
   private void maybeTriggerOldTransactionFlush() {
-      if (commitedTransactionsBySequenceNumber.size() > oldTransactionFlushTrigger) {
-        removeUnNeededCommitedTransactions();
-      }
+    if (commitedTransactionsBySequenceNumber.size() > oldTransactionFlushTrigger) {
+      removeUnNeededCommitedTransactions();
+    }
   }
-  
+
   /**
    * Cleanup references to committed transactions that are no longer needed.
    * 
@@ -586,20 +699,20 @@
     if (LOG.isDebugEnabled()) {
       StringBuilder debugMessage = new StringBuilder();
       if (numRemoved > 0) {
-        debugMessage.append("Removed ").append(numRemoved).append(
-            " commited transactions");
+        debugMessage.append("Removed [").append(numRemoved).append(
+            "] commited transactions");
 
         if (minStartSeqNumber == Integer.MAX_VALUE) {
           debugMessage.append("with any sequence number");
         } else {
-          debugMessage.append("with sequence lower than ").append(
-              minStartSeqNumber).append(".");
+          debugMessage.append("with sequence lower than [").append(
+              minStartSeqNumber).append("].");
         }
         if (!commitedTransactionsBySequenceNumber.isEmpty()) {
-          debugMessage.append(" Still have ").append(
-              commitedTransactionsBySequenceNumber.size()).append(" left.");
+          debugMessage.append(" Still have [").append(
+              commitedTransactionsBySequenceNumber.size()).append("] left.");
         } else {
-          debugMessage.append("None left.");
+          debugMessage.append(" None left.");
         }
         LOG.debug(debugMessage.toString());
       } else if (commitedTransactionsBySequenceNumber.size() > 0) {
@@ -612,8 +725,13 @@
   }
 
   private Integer getMinStartSequenceNumber() {
+    LinkedList<TransactionState> transactionStates;
+    synchronized (transactionsById) {
+      transactionStates = new LinkedList<TransactionState>(transactionsById
+          .values());
+    }
     Integer min = null;
-    for (TransactionState transactionState : transactionsById.values()) {
+    for (TransactionState transactionState : transactionStates) {
       if (min == null || transactionState.getStartSequenceNumber() < min) {
         min = transactionState.getStartSequenceNumber();
       }
@@ -623,10 +741,16 @@
 
   // TODO, resolve from the global transaction log
   @SuppressWarnings("unused")
-  private void resolveTransactionFromLog(final long transactionId) {
-    throw new RuntimeException("Globaql transaction log is not Implemented");
+  private void resolveTransactionFromLog(final TransactionState transactionState)
+      throws IOException {
+    LOG
+        .error("Global transaction log is not Implemented. (Optimisticly) assuming transaction commit!");
+    commit(transactionState);
+    // throw new RuntimeException("Global transaction log is not Implemented");
   }
 
+  private static final int MAX_COMMIT_PENDING_WAITS = 10;
+
   private class TransactionLeaseListener implements LeaseListener {
     private final String transactionName;
 
@@ -635,7 +759,8 @@
     }
 
     public void leaseExpired() {
-      LOG.info("Transaction " + this.transactionName + " lease expired");
+      LOG.info("Transaction [" + this.transactionName + "] expired in region ["
+          + getRegionInfo().getRegionNameAsString() + "]");
       TransactionState s = null;
       synchronized (transactionsById) {
         s = transactionsById.remove(transactionName);
@@ -652,8 +777,27 @@
       case COMMIT_PENDING:
         LOG.info("Transaction " + s.getTransactionId()
             + " expired in COMMIT_PENDING state");
-        LOG.info("Checking transaction status in transaction log");
-        resolveTransactionFromLog(s.getTransactionId());
+
+        try {
+          if (s.getCommitPendingWaits() > MAX_COMMIT_PENDING_WAITS) {
+            LOG.info("Checking transaction status in transaction log");
+            resolveTransactionFromLog(s);
+            break;
+          }
+          LOG.info("renewing lease and hoping for commit");
+          s.incrementCommitPendingWaits();
+          String key = Long.toString(s.getTransactionId());
+          transactionsById.put(key, s);
+          try {
+            transactionLeases.createLease(getLeaseId(s.getTransactionId()),
+                this);
+          } catch (LeaseStillHeldException e) {
+            transactionLeases.renewLease(getLeaseId(s.getTransactionId()));
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+
         break;
       default:
         LOG.warn("Unexpected status on expired lease");
@@ -696,7 +840,10 @@
       TransactionState state = getTransactionState(transactionId);
 
       if (result) {
+        // TODO: Is this right???? St.Ack
+
         Map<byte[], Cell> localWrites = state.localGetFull(key.getRow(), null,
+
             Integer.MAX_VALUE);
         if (localWrites != null) {
           LOG

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=774570&r1=774569&r2=774570&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java Wed May 13 22:35:55 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Leases;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
@@ -57,8 +58,12 @@
  */
 public class TransactionalRegionServer extends HRegionServer implements
     TransactionalRegionInterface {
+  private static final String LEASE_TIME = "hbase.transaction.leasetime";
+  private static final int DEFAULT_LEASE_TIME = 60 * 1000;
+  private static final int LEASE_CHECK_FREQUENCY = 1000;
+  
   static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class);
-
+  private final Leases transactionLeases;
   private final CleanOldTransactionsChore cleanOldTransactionsThread;
 
   /**
@@ -81,6 +86,9 @@
     super(address, conf);
     cleanOldTransactionsThread = new CleanOldTransactionsChore(this,
         super.stopRequested);
+    transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME),
+        LEASE_CHECK_FREQUENCY);
+    LOG.error("leases time:"+conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME));
   }
 
   @Override
@@ -104,6 +112,7 @@
     };
     Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n
         + ".oldTransactionCleaner", handler);
+    Threads.setDaemonThreadRunning(this.transactionLeases, "Transactional leases");
 
   }
 
@@ -112,7 +121,7 @@
       throws IOException {
     HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
         .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
-        .getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
+        .getFileSystem(), super.conf, regionInfo, super.getFlushRequester(), this.transactionLeases);
     r.initialize(null, new Progressable() {
       public void progress() {
         addProcessingMessage(regionInfo);
@@ -125,13 +134,29 @@
       throws NotServingRegionException {
     return (TransactionalRegion) super.getRegion(regionName);
   }
+  
+  protected Leases getTransactionalLeases() {
+    return this.transactionLeases;
+  }
 
+  /** We want to delay the close region for a bit if we have commit pending transactions.
+   * 
+   */
+  @Override
+  protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
+  throws IOException {
+    getTransactionalRegion(hri.getRegionName()).prepareToClose();
+    super.closeRegion(hri, reportWhenCompleted);
+  }
+  
   public void abort(final byte[] regionName, final long transactionId)
       throws IOException {
     checkOpen();
     super.getRequestCount().incrementAndGet();
     try {
       getTransactionalRegion(regionName).abort(transactionId);
+    } catch(NotServingRegionException e) {
+      LOG.info("Got not serving region durring abort. Ignoring.");
     } catch (IOException e) {
       checkFileSystem();
       throw e;
@@ -162,7 +187,7 @@
     }
   }
 
-  public boolean commitRequest(final byte[] regionName, final long transactionId)
+  public int commitRequest(final byte[] regionName, final long transactionId)
       throws IOException {
     checkOpen();
     super.getRequestCount().incrementAndGet();
@@ -173,6 +198,18 @@
       throw e;
     }
   }
+  
+  public boolean commitIfPossible(byte[] regionName, long transactionId)
+  throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      return getTransactionalRegion(regionName).commitIfPossible(transactionId);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
 
   public Cell get(final long transactionId, final byte[] regionName,
       final byte[] row, final byte[] column) throws IOException {
@@ -228,6 +265,8 @@
   public RowResult getRow(final long transactionId, final byte[] regionName,
       final byte[] row, final byte[][] columns, final long ts)
       throws IOException {
+    long startTime = System.nanoTime();
+
     checkOpen();
     super.getRequestCount().incrementAndGet();
     try {
@@ -239,9 +278,9 @@
       }
 
       TransactionalRegion region = getTransactionalRegion(regionName);
-      Map<byte[], Cell> map = region.getFull(transactionId, row, columnSet, ts);
       HbaseMapWritable<byte[], Cell> result = new HbaseMapWritable<byte[], Cell>();
-      result.putAll(map);
+      result.putAll(region.getFull(transactionId, row, columnSet, ts));
+      LOG.debug("Got row ["+Bytes.toString(row)+"] in ["+((System.nanoTime()-startTime) / 1000)+"]micro seconds");
       return new RowResult(row, result);
     } catch (IOException e) {
       checkFileSystem();



Mime
View raw message