hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r789195 [11/11] - in /hadoop/hbase/trunk_on_hadoop-0.18.3/contrib: ./ stargate/ stargate/conf/ stargate/lib/ stargate/src/ stargate/src/java/ stargate/src/java/org/ stargate/src/java/org/apache/ stargate/src/java/org/apache/hadoop/ stargate...
Date Mon, 29 Jun 2009 02:49:26 GMT
Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=789195&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Mon Jun 29 02:49:21 2009
@@ -0,0 +1,733 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LeaseException;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Regionserver which provides transactional support for atomic transactions.
+ * This is achieved with optimistic concurrency control (see
+ * http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf). We keep track
+ * read and write sets for each transaction, and hold off on processing the
+ * writes. To decide to commit a transaction we check its read sets with all
+ * transactions that have committed while it was running for overlaps.
+ * <p>
+ * Because transactions can span multiple regions, all regions must agree to
+ * commit a transactions. The client side of this commit protocol is encoded in
+ * org.apache.hadoop.hbase.client.transactional.TransactionManger
+ * <p>
+ * In the event of an failure of the client mid-commit, (after we voted yes), we
+ * will have to consult the transaction log to determine the final decision of
+ * the transaction. This is not yet implemented.
+ */
+public class TransactionalRegion extends HRegion {
+
+  private static final String OLD_TRANSACTION_FLUSH = "hbase.transaction.flush";
+  private static final int DEFAULT_OLD_TRANSACTION_FLUSH = 100; // Do a flush if
+  // we have this
+  // many old
+  // transactions..
+
+  static final Log LOG = LogFactory.getLog(TransactionalRegion.class);
+
+  // Collection of active transactions (PENDING) keyed by id.
+  protected Map<String, TransactionState> transactionsById = new HashMap<String, TransactionState>();
+
+  // Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by
+  // their sequence number
+  private SortedMap<Integer, TransactionState> commitedTransactionsBySequenceNumber = Collections
+      .synchronizedSortedMap(new TreeMap<Integer, TransactionState>());
+
+  // Collection of transactions that are COMMIT_PENDING
+  private Set<TransactionState> commitPendingTransactions = Collections
+      .synchronizedSet(new HashSet<TransactionState>());
+
+  private AtomicInteger nextSequenceId = new AtomicInteger(0);
+  private Object commitCheckLock = new Object();
+  private THLog hlog;
+  private final int oldTransactionFlushTrigger;
+  private final Leases transactionLeases;
+
+  /**
+   * @param basedir
+   * @param log
+   * @param fs
+   * @param conf
+   * @param regionInfo
+   * @param flushListener
+   */
+  public TransactionalRegion(final Path basedir, final HLog log,
+      final FileSystem fs, final HBaseConfiguration conf,
+      final HRegionInfo regionInfo, final FlushRequester flushListener,
+      final Leases transactionalLeases) {
+    super(basedir, log, fs, conf, regionInfo, flushListener);
+    this.hlog = (THLog) log;
+    oldTransactionFlushTrigger = conf.getInt(OLD_TRANSACTION_FLUSH,
+        DEFAULT_OLD_TRANSACTION_FLUSH);
+    this.transactionLeases = transactionalLeases;
+  }
+
+  @Override
+  protected void doReconstructionLog(final Path oldLogFile,
+      final long minSeqId, final long maxSeqId, final Progressable reporter)
+      throws UnsupportedEncodingException, IOException {
+    super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
+
+    THLogRecoveryManager recoveryManager = new THLogRecoveryManager(this);
+    Map<Long, List<KeyValue>> commitedTransactionsById = recoveryManager
+        .getCommitsFromLog(oldLogFile, minSeqId, reporter);
+
+    if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
+      LOG.debug("found " + commitedTransactionsById.size()
+          + " COMMITED transactions");
+
+      for (Entry<Long, List<KeyValue>> entry : commitedTransactionsById
+          .entrySet()) {
+        LOG.debug("Writing " + entry.getValue().size()
+            + " updates for transaction " + entry.getKey());
+        for (KeyValue b : entry.getValue()) {
+          Put put = new Put(b.getRow());
+          put.add(b);
+          super.put(put, true); // These are walled so they live forever
+        }
+      }
+
+      // LOG.debug("Flushing cache"); // We must trigger a cache flush,
+      // otherwise
+      // we will would ignore the log on subsequent failure
+      // if (!super.flushcache()) {
+      // LOG.warn("Did not flush cache");
+      // }
+    }
+  }
+
+  /**
+   * We need to make sure that we don't complete a cache flush between running
+   * transactions. If we did, then we would not find all log messages needed to
+   * restore the transaction, as some of them would be before the last
+   * "complete" flush id.
+   */
+  @Override
+  protected long getCompleteCacheFlushSequenceId(final long currentSequenceId) {
+    LinkedList<TransactionState> transactionStates;
+    synchronized (transactionsById) {
+      transactionStates = new LinkedList<TransactionState>(transactionsById
+          .values());
+    }
+
+    long minPendingStartSequenceId = currentSequenceId;
+    for (TransactionState transactionState : transactionStates) {
+      minPendingStartSequenceId = Math.min(minPendingStartSequenceId,
+          transactionState.getHLogStartSequenceId());
+    }
+    return minPendingStartSequenceId;
+  }
+
+  /**
+   * @param transactionId
+   * @throws IOException
+   */
+  public void beginTransaction(final long transactionId) throws IOException {
+    checkClosing();
+    String key = String.valueOf(transactionId);
+    if (transactionsById.get(key) != null) {
+      TransactionState alias = getTransactionState(transactionId);
+      if (alias != null) {
+        alias.setStatus(Status.ABORTED);
+        retireTransaction(alias);
+      }
+      LOG.error("Existing trasaction with id [" + key + "] in region ["
+          + super.getRegionInfo().getRegionNameAsString() + "]");
+      throw new IOException("Already exiting transaction id: " + key);
+    }
+
+    TransactionState state = new TransactionState(transactionId, super.getLog()
+        .getSequenceNumber(), super.getRegionInfo());
+
+    // Order is important here ...
+    List<TransactionState> commitPendingCopy = new LinkedList<TransactionState>(
+        commitPendingTransactions);
+    for (TransactionState commitPending : commitPendingCopy) {
+      state.addTransactionToCheck(commitPending);
+    }
+    state.setStartSequenceNumber(nextSequenceId.get());
+
+    synchronized (transactionsById) {
+      transactionsById.put(key, state);
+    }
+    try {
+      transactionLeases.createLease(getLeaseId(transactionId),
+          new TransactionLeaseListener(key));
+    } catch (LeaseStillHeldException e) {
+      LOG.error("Lease still held for [" + key + "] in region ["
+          + super.getRegionInfo().getRegionNameAsString() + "]");
+      throw new RuntimeException(e);
+    }
+    LOG.debug("Begining transaction " + key + " in region "
+        + super.getRegionInfo().getRegionNameAsString());
+    this.hlog.writeStartToLog(super.getRegionInfo(), transactionId);
+
+    maybeTriggerOldTransactionFlush();
+  }
+
+  private String getLeaseId(long transactionId) {
+    return super.getRegionInfo().getRegionNameAsString() + transactionId;
+  }
+
+  public Result get(final long transactionId, Get get) throws IOException {
+    checkClosing();
+
+    TransactionState state = getTransactionState(transactionId);
+
+    state.addRead(get.getRow());
+
+    Result superGet = super.get(get, null);
+    Result localGet = state.localGet(get);
+
+    if (localGet != null) {
+      LOG
+          .trace("Transactional get of something we've written in the same transaction "
+              + transactionId);
+
+      List<KeyValue> mergedGet = new ArrayList<KeyValue>(Arrays.asList(localGet
+          .raw()));
+
+      if (superGet != null && !superGet.isEmpty()) {
+        for (KeyValue kv : superGet.raw()) {
+          if (!localGet.containsColumn(kv.getFamily(), kv.getQualifier())) {
+            mergedGet.add(kv);
+          }
+        }
+      }
+      return new Result(mergedGet);
+    }
+
+    return superGet;
+  }
+
+  /**
+   * Get a transactional scanner.
+   */
+  public InternalScanner getScanner(final long transactionId, Scan scan)
+      throws IOException {
+    checkClosing();
+
+    TransactionState state = getTransactionState(transactionId);
+    state.addScan(scan);
+    return new ScannerWrapper(transactionId, super.getScanner(scan));
+  }
+
+  /**
+   * Add a write to the transaction. Does not get applied until commit process.
+   * 
+   * @param transactionId
+   * @param b
+   * @throws IOException
+   */
+  public void put(final long transactionId, final Put put) throws IOException {
+    checkClosing();
+
+    TransactionState state = getTransactionState(transactionId);
+    state.addWrite(put);
+    this.hlog.writeUpdateToLog(super.getRegionInfo(), transactionId, put);
+  }
+
+  /**
+   * Add multiple writes to the transaction. Does not get applied until commit
+   * process.
+   * 
+   * @param transactionId
+   * @param puts
+   * @throws IOException
+   */
+  public void put(final long transactionId, final Put[] puts)
+      throws IOException {
+    checkClosing();
+
+    TransactionState state = getTransactionState(transactionId);
+    for (Put put : puts) {
+      state.addWrite(put);
+      this.hlog.writeUpdateToLog(super.getRegionInfo(), transactionId, put);
+    }
+  }
+
+  /**
+   * Add a delete to the transaction. Does not get applied until commit process.
+   * FIXME, not sure about this approach
+   * 
+   * @param transactionId
+   * @param row
+   * @param timestamp
+   * @throws IOException
+   */
+  public void delete(final long transactionId, Delete delete)
+      throws IOException {
+    checkClosing();
+    TransactionState state = getTransactionState(transactionId);
+    state.addDelete(delete);
+    this.hlog.writeDeleteToLog(super.getRegionInfo(), transactionId, delete);
+  }
+
+  /**
+   * @param transactionId
+   * @return TransactionRegionInterface commit code
+   * @throws IOException
+   */
+  public int commitRequest(final long transactionId) throws IOException {
+    checkClosing();
+
+    synchronized (commitCheckLock) {
+      TransactionState state = getTransactionState(transactionId);
+      if (state == null) {
+        return TransactionalRegionInterface.COMMIT_UNSUCESSFUL;
+      }
+
+      if (hasConflict(state)) {
+        state.setStatus(Status.ABORTED);
+        retireTransaction(state);
+        return TransactionalRegionInterface.COMMIT_UNSUCESSFUL;
+      }
+
+      // No conflicts, we can commit.
+      LOG.trace("No conflicts for transaction " + transactionId
+          + " found in region " + super.getRegionInfo().getRegionNameAsString()
+          + ". Voting for commit");
+
+      // If there are writes we must keep record off the transaction
+      if (state.getWriteSet().size() > 0) {
+        // Order is important
+        state.setStatus(Status.COMMIT_PENDING);
+        commitPendingTransactions.add(state);
+        state.setSequenceNumber(nextSequenceId.getAndIncrement());
+        commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(),
+            state);
+        return TransactionalRegionInterface.COMMIT_OK;
+      }
+      // Otherwise we were read-only and commitable, so we can forget it.
+      state.setStatus(Status.COMMITED);
+      retireTransaction(state);
+      return TransactionalRegionInterface.COMMIT_OK_READ_ONLY;
+    }
+  }
+
+  /**
+   * @param transactionId
+   * @return true if commit is successful
+   * @throws IOException
+   */
+  public boolean commitIfPossible(final long transactionId) throws IOException {
+    int status = commitRequest(transactionId);
+
+    if (status == TransactionalRegionInterface.COMMIT_OK) {
+      commit(transactionId);
+      return true;
+    } else if (status == TransactionalRegionInterface.COMMIT_OK_READ_ONLY) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean hasConflict(final TransactionState state) {
+    // Check transactions that were committed while we were running
+    for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) {
+      TransactionState other = commitedTransactionsBySequenceNumber.get(i);
+      if (other == null) {
+        continue;
+      }
+      state.addTransactionToCheck(other);
+    }
+
+    return state.hasConflict();
+  }
+
+  /**
+   * Commit the transaction.
+   * 
+   * @param transactionId
+   * @throws IOException
+   */
+  public void commit(final long transactionId) throws IOException {
+    // Not checking closing...
+    TransactionState state;
+    try {
+      state = getTransactionState(transactionId);
+    } catch (UnknownTransactionException e) {
+      LOG.fatal("Asked to commit unknown transaction: " + transactionId
+          + " in region " + super.getRegionInfo().getRegionNameAsString());
+      // FIXME Write to the transaction log that this transaction was corrupted
+      throw e;
+    }
+
+    if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
+      LOG.fatal("Asked to commit a non pending transaction");
+      // FIXME Write to the transaction log that this transaction was corrupted
+      throw new IOException("commit failure");
+    }
+
+    commit(state);
+  }
+
+  /**
+   * Commit the transaction.
+   * 
+   * @param transactionId
+   * @throws IOException
+   */
+  public void abort(final long transactionId) throws IOException {
+    // Not checking closing...
+    TransactionState state;
+    try {
+      state = getTransactionState(transactionId);
+    } catch (UnknownTransactionException e) {
+      LOG.info("Asked to abort unknown transaction [" + transactionId
+          + "] in region [" + getRegionInfo().getRegionNameAsString()
+          + "], ignoring");
+      return;
+    }
+
+    state.setStatus(Status.ABORTED);
+
+    this.hlog.writeAbortToLog(super.getRegionInfo(), state.getTransactionId());
+
+    // Following removes needed if we have voted
+    if (state.getSequenceNumber() != null) {
+      commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber());
+    }
+    commitPendingTransactions.remove(state);
+
+    retireTransaction(state);
+  }
+
+  private void commit(final TransactionState state) throws IOException {
+
+    LOG.debug("Commiting transaction: " + state.toString() + " to "
+        + super.getRegionInfo().getRegionNameAsString());
+
+    this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
+
+    for (Put update : state.getWriteSet()) {
+      this.put(update, false); // Don't need to WAL these
+      // FIME, maybe should be walled so we don't need to look so far back.
+    }
+
+    state.setStatus(Status.COMMITED);
+    if (state.getWriteSet().size() > 0
+        && !commitPendingTransactions.remove(state)) {
+      LOG
+          .fatal("Commiting a non-query transaction that is not in commitPendingTransactions");
+      throw new IOException("commit failure"); // FIXME, how to handle?
+    }
+    retireTransaction(state);
+  }
+
+  @Override
+  public List<StoreFile> close(boolean abort) throws IOException {
+    prepareToClose();
+    if (!commitPendingTransactions.isEmpty()) {
+      // FIXME, better way to handle?
+      LOG.warn("Closing transactional region ["
+          + getRegionInfo().getRegionNameAsString() + "], but still have ["
+          + commitPendingTransactions.size()
+          + "] transactions  that are pending commit");
+    }
+    return super.close(abort);
+  }
+
+  @Override
+  protected void prepareToSplit() {
+    prepareToClose();
+  }
+
+  boolean closing = false;
+
+  /**
+   * Get ready to close.
+   * 
+   */
+  void prepareToClose() {
+    LOG.info("Preparing to close region "
+        + getRegionInfo().getRegionNameAsString());
+    closing = true;
+
+    while (!commitPendingTransactions.isEmpty()) {
+      LOG.info("Preparing to closing transactional region ["
+          + getRegionInfo().getRegionNameAsString() + "], but still have ["
+          + commitPendingTransactions.size()
+          + "] transactions that are pending commit. Sleeping");
+      for (TransactionState s : commitPendingTransactions) {
+        LOG.info(s.toString());
+      }
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+
+    }
+  }
+
+  private void checkClosing() throws IOException {
+    if (closing) {
+      throw new IOException("closing region, no more transaction allowed");
+    }
+  }
+
+  // Cancel leases, and removed from lease lookup. This transaction may still
+  // live in commitedTransactionsBySequenceNumber and commitPendingTransactions
+  private void retireTransaction(final TransactionState state) {
+    String key = String.valueOf(state.getTransactionId());
+    try {
+      transactionLeases.cancelLease(getLeaseId(state.getTransactionId()));
+    } catch (LeaseException e) {
+      // Ignore
+    }
+
+    transactionsById.remove(key);
+  }
+
+  protected TransactionState getTransactionState(final long transactionId)
+      throws UnknownTransactionException {
+    String key = String.valueOf(transactionId);
+    TransactionState state = null;
+
+    state = transactionsById.get(key);
+
+    if (state == null) {
+      LOG.debug("Unknown transaction: [" + key + "], region: ["
+          + getRegionInfo().getRegionNameAsString() + "]");
+      throw new UnknownTransactionException("transaction: [" + key
+          + "], region: [" + getRegionInfo().getRegionNameAsString() + "]");
+    }
+
+    try {
+      transactionLeases.renewLease(getLeaseId(transactionId));
+    } catch (LeaseException e) {
+      throw new RuntimeException(e);
+    }
+
+    return state;
+  }
+
+  private void maybeTriggerOldTransactionFlush() {
+    if (commitedTransactionsBySequenceNumber.size() > oldTransactionFlushTrigger) {
+      removeUnNeededCommitedTransactions();
+    }
+  }
+
+  /**
+   * Cleanup references to committed transactions that are no longer needed.
+   * 
+   */
+  synchronized void removeUnNeededCommitedTransactions() {
+    Integer minStartSeqNumber = getMinStartSequenceNumber();
+    if (minStartSeqNumber == null) {
+      minStartSeqNumber = Integer.MAX_VALUE; // Remove all
+    }
+
+    int numRemoved = 0;
+    // Copy list to avoid conc update exception
+    for (Entry<Integer, TransactionState> entry : new LinkedList<Entry<Integer, TransactionState>>(
+        commitedTransactionsBySequenceNumber.entrySet())) {
+      if (entry.getKey() >= minStartSeqNumber) {
+        break;
+      }
+      numRemoved = numRemoved
+          + (commitedTransactionsBySequenceNumber.remove(entry.getKey()) == null ? 0
+              : 1);
+      numRemoved++;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      StringBuilder debugMessage = new StringBuilder();
+      if (numRemoved > 0) {
+        debugMessage.append("Removed [").append(numRemoved).append(
+            "] commited transactions");
+
+        if (minStartSeqNumber == Integer.MAX_VALUE) {
+          debugMessage.append("with any sequence number");
+        } else {
+          debugMessage.append("with sequence lower than [").append(
+              minStartSeqNumber).append("].");
+        }
+        if (!commitedTransactionsBySequenceNumber.isEmpty()) {
+          debugMessage.append(" Still have [").append(
+              commitedTransactionsBySequenceNumber.size()).append("] left.");
+        } else {
+          debugMessage.append(" None left.");
+        }
+        LOG.debug(debugMessage.toString());
+      } else if (commitedTransactionsBySequenceNumber.size() > 0) {
+        debugMessage.append(
+            "Could not remove any transactions, and still have ").append(
+            commitedTransactionsBySequenceNumber.size()).append(" left");
+        LOG.debug(debugMessage.toString());
+      }
+    }
+  }
+
+  private Integer getMinStartSequenceNumber() {
+    LinkedList<TransactionState> transactionStates;
+    synchronized (transactionsById) {
+      transactionStates = new LinkedList<TransactionState>(transactionsById
+          .values());
+    }
+    Integer min = null;
+    for (TransactionState transactionState : transactionStates) {
+      if (min == null || transactionState.getStartSequenceNumber() < min) {
+        min = transactionState.getStartSequenceNumber();
+      }
+    }
+    return min;
+  }
+
+  private void resolveTransactionFromLog(final TransactionState transactionState)
+      throws IOException {
+    LOG
+        .error("Global transaction log is not Implemented. (Optimisticly) assuming transaction commit!");
+    commit(transactionState);
+    // throw new RuntimeException("Global transaction log is not Implemented");
+  }
+
+
+  private static final int MAX_COMMIT_PENDING_WAITS = 10;
+
+  private class TransactionLeaseListener implements LeaseListener {
+    private final String transactionName;
+
+    TransactionLeaseListener(final String n) {
+      this.transactionName = n;
+    }
+
+    public void leaseExpired() {
+      LOG.info("Transaction [" + this.transactionName + "] expired in region ["
+          + getRegionInfo().getRegionNameAsString() + "]");
+      TransactionState s = null;
+      synchronized (transactionsById) {
+        s = transactionsById.remove(transactionName);
+      }
+      if (s == null) {
+        LOG.warn("Unknown transaction expired " + this.transactionName);
+        return;
+      }
+
+      switch (s.getStatus()) {
+      case PENDING:
+        s.setStatus(Status.ABORTED); // Other transactions may have a ref
+        break;
+      case COMMIT_PENDING:
+        LOG.info("Transaction " + s.getTransactionId()
+            + " expired in COMMIT_PENDING state");
+
+        try {
+          if (s.getCommitPendingWaits() > MAX_COMMIT_PENDING_WAITS) {
+            LOG.info("Checking transaction status in transaction log");
+            resolveTransactionFromLog(s);
+            break;
+          }
+          LOG.info("renewing lease and hoping for commit");
+          s.incrementCommitPendingWaits();
+          String key = Long.toString(s.getTransactionId());
+          transactionsById.put(key, s);
+          try {
+            transactionLeases.createLease(getLeaseId(s.getTransactionId()),
+                this);
+          } catch (LeaseStillHeldException e) {
+            transactionLeases.renewLease(getLeaseId(s.getTransactionId()));
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+
+        break;
+      default:
+        LOG.warn("Unexpected status on expired lease");
+      }
+    }
+  }
+
+  /** Wrapper which keeps track of rows returned by scanner. */
+  private class ScannerWrapper implements InternalScanner {
+    private long transactionId;
+    private InternalScanner scanner;
+
+    /**
+     * @param transactionId
+     * @param scanner
+     * @throws UnknownTransactionException
+     */
+    public ScannerWrapper(final long transactionId,
+        final InternalScanner scanner) throws UnknownTransactionException {
+
+      this.transactionId = transactionId;
+      this.scanner = scanner;
+    }
+
+    public void close() throws IOException {
+      scanner.close();
+    }
+
+    public boolean next(List<KeyValue> results) throws IOException {
+      boolean result = scanner.next(results);
+      TransactionState state = getTransactionState(transactionId);
+      // FIXME need to weave in new stuff from this transaction too.
+      return result;
+    }
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=789195&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java Mon Jun 29 02:49:21 2009
@@ -0,0 +1,245 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * RegionServer with support for transactions. Transactional logic is at the
+ * region level, so we mostly just delegate to the appropriate
+ * TransactionalRegion.
+ */
+public class TransactionalRegionServer extends HRegionServer implements
+    TransactionalRegionInterface {
+  private static final String LEASE_TIME = "hbase.transaction.leasetime";
+  private static final int DEFAULT_LEASE_TIME = 60 * 1000;
+  private static final int LEASE_CHECK_FREQUENCY = 1000;
+  
+  static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class);
+  private final Leases transactionLeases;
+  private final CleanOldTransactionsChore cleanOldTransactionsThread;
+
+  /**
+   * @param address
+   * @param conf
+   * @throws IOException
+   */
+  public TransactionalRegionServer(final HBaseConfiguration conf)
+      throws IOException {
+    super(conf);
+    cleanOldTransactionsThread = new CleanOldTransactionsChore(this,
+        super.stopRequested);
+    transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME),
+        LEASE_CHECK_FREQUENCY);
+    LOG.error("leases time:"+conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME));
+  }
+
+  @Override
+  public long getProtocolVersion(final String protocol, final long clientVersion)
+      throws IOException {
+    if (protocol.equals(TransactionalRegionInterface.class.getName())) {
+      return HBaseRPCProtocolVersion.versionID;
+    }
+    return super.getProtocolVersion(protocol, clientVersion);
+  }
+
+  @Override
+  protected void init(final MapWritable c) throws IOException {
+    super.init(c);
+    String n = Thread.currentThread().getName();
+    UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+      public void uncaughtException(final Thread t, final Throwable e) {
+        abort();
+        LOG.fatal("Set stop flag in " + t.getName(), e);
+      }
+    };
+    Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n
+        + ".oldTransactionCleaner", handler);
+    Threads.setDaemonThreadRunning(this.transactionLeases, "Transactional leases");
+
+  }
+
+  @Override
+  protected HLog instantiateHLog(Path logdir) throws IOException {
+    HLog newlog = new THLog(super.getFileSystem(), logdir, conf, super.getLogRoller());
+    return newlog;
+  }
+  
+  @Override
+  protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+      throws IOException {
+    HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
+        .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
+        .getFileSystem(), super.conf, regionInfo, super.getFlushRequester(), this.getTransactionalLeases());
+    r.initialize(null, new Progressable() {
+      public void progress() {
+        addProcessingMessage(regionInfo);
+      }
+    });
+    return r;
+  }
+
+  protected TransactionalRegion getTransactionalRegion(final byte[] regionName)
+      throws NotServingRegionException {
+    return (TransactionalRegion) super.getRegion(regionName);
+  }
+  
+  protected Leases getTransactionalLeases() {
+    return this.transactionLeases;
+  }
+
+  /** We want to delay the close region for a bit if we have commit pending transactions.
+   * 
+   */
+  @Override
+  protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
+  throws IOException {
+    getTransactionalRegion(hri.getRegionName()).prepareToClose();
+    super.closeRegion(hri, reportWhenCompleted);
+  }
+  
+  public void abort(final byte[] regionName, final long transactionId)
+      throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      getTransactionalRegion(regionName).abort(transactionId);
+    } catch(NotServingRegionException e) {
+      LOG.info("Got not serving region durring abort. Ignoring.");
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public void commit(final byte[] regionName, final long transactionId)
+      throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      getTransactionalRegion(regionName).commit(transactionId);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public int commitRequest(final byte[] regionName, final long transactionId)
+      throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      return getTransactionalRegion(regionName).commitRequest(transactionId);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+  
+  public boolean commitIfPossible(byte[] regionName, long transactionId)
+  throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      return getTransactionalRegion(regionName).commitIfPossible(transactionId);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public long openScanner(final long transactionId, byte [] regionName, Scan scan)
+  throws IOException {
+    checkOpen();
+    NullPointerException npe = null;
+    if (regionName == null) {
+      npe = new NullPointerException("regionName is null");
+    } else if (scan == null) {
+      npe = new NullPointerException("scan is null");
+    }
+    if (npe != null) {
+      throw new IOException("Invalid arguments to openScanner", npe);
+    }
+    super.getRequestCount().incrementAndGet();
+    try {
+      TransactionalRegion r = getTransactionalRegion(regionName);
+      InternalScanner s = r.getScanner(transactionId, scan);
+      long scannerId = addScanner(s);
+      return scannerId;
+    } catch (IOException e) {
+      LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
+          RemoteExceptionHandler.checkIOException(e));
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public void beginTransaction(final long transactionId, final byte[] regionName)
+      throws IOException {
+    getTransactionalRegion(regionName).beginTransaction(transactionId);
+  }
+
+  public void delete(long transactionId, byte[] regionName, Delete delete)
+      throws IOException {
+    getTransactionalRegion(regionName).delete(transactionId, delete);
+  }
+
+  public Result get(long transactionId, byte[] regionName, Get get)
+      throws IOException {
+    return getTransactionalRegion(regionName).get(transactionId, get);
+  }
+
+  public void put(long transactionId, byte[] regionName, Put put)
+      throws IOException {
+    getTransactionalRegion(regionName).put(transactionId, put);
+    
+  }
+
+  public int put(long transactionId, byte[] regionName, Put[] puts)
+      throws IOException {
+    getTransactionalRegion(regionName).put(transactionId, puts);
+    return puts.length; // ??
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java?rev=789195&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java Mon Jun 29 02:49:21 2009
@@ -0,0 +1,134 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestIndexedTable extends HBaseClusterTestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestIndexedTable.class);
+
+  private static final String TABLE_NAME = "table1";
+
+  private static final byte[] FAMILY_COLON = Bytes.toBytes("family:");
+  private static final byte[] FAMILY = Bytes.toBytes("family");
+  private static final byte[] QUAL_A = Bytes.toBytes("a");
+  private static final byte[] COL_A = Bytes.toBytes("family:a");
+  private static final String INDEX_COL_A = "A";
+
+  private static final int NUM_ROWS = 10;
+  private static final int MAX_VAL = 10000;
+
+  private IndexedTableAdmin admin;
+  private IndexedTable table;
+  private Random random = new Random();
+
+  /** constructor */
+  public TestIndexedTable() {
+    conf
+        .set(HConstants.REGION_SERVER_IMPL, IndexedRegionServer.class.getName());
+    conf.setInt("hbase.master.info.port", -1);
+    conf.setInt("hbase.regionserver.info.port", -1);
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(FAMILY_COLON));
+
+    IndexedTableDescriptor indexDesc = new IndexedTableDescriptor(desc);
+    // Create a new index that does lexicographic ordering on COL_A
+    IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A,
+        COL_A);
+    indexDesc.addIndex(colAIndex);
+
+    admin = new IndexedTableAdmin(conf);
+    admin.createIndexedTable(indexDesc);
+    table = new IndexedTable(conf, desc.getName());
+  }
+
+  private void writeInitalRows() throws IOException {
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Put update = new Put(PerformanceEvaluation.format(i));
+      byte[] valueA = PerformanceEvaluation.format(random.nextInt(MAX_VAL));
+      update.add(FAMILY, QUAL_A, valueA);
+      table.put(update);
+      LOG.info("Inserted row [" + Bytes.toString(update.getRow()) + "] val: ["
+          + Bytes.toString(valueA) + "]");
+    }
+  }
+
+
+  public void testInitialWrites() throws IOException {
+    writeInitalRows();
+    assertRowsInOrder(NUM_ROWS);
+  }
+  
+  private void assertRowsInOrder(int numRowsExpected) throws IndexNotFoundException, IOException {
+    ResultScanner scanner = table.getIndexedScanner(INDEX_COL_A,
+        HConstants.EMPTY_START_ROW, null, null, null);
+    int numRows = 0;
+    byte[] lastColA = null;
+    for (Result rowResult : scanner) {
+      byte[] colA = rowResult.getValue(COL_A);
+      LOG.info("index scan : row [" + Bytes.toString(rowResult.getRow())
+          + "] value [" + Bytes.toString(colA) + "]");
+      if (lastColA != null) {
+        Assert.assertTrue(Bytes.compareTo(lastColA, colA) <= 0);
+      }
+      lastColA = colA;
+      numRows++;
+    }
+    scanner.close();
+    Assert.assertEquals(numRowsExpected, numRows);  
+  }
+
+  public void testMultipleWrites() throws IOException {
+    writeInitalRows();
+    writeInitalRows(); // Update the rows.
+    assertRowsInOrder(NUM_ROWS);
+  }
+  
+  public void testDelete() throws IOException {
+    writeInitalRows();
+    // Delete the first row;
+    table.deleteAll(PerformanceEvaluation.format(0));
+    
+    assertRowsInOrder(NUM_ROWS - 1);    
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java?rev=789195&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java Mon Jun 29 02:49:21 2009
@@ -0,0 +1,416 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Stress Test the transaction functionality. This requires to run an
+ * {@link TransactionalRegionServer}. We run many threads doing reads/writes
+ * which may conflict with each other. We have two types of transactions, those
+ * which operate on rows of a single table, and those which operate on rows
+ * across multiple tables. Each transaction type has a modification operation
+ * which changes two values while maintaining the sum. Also each transaction
+ * type has a consistency-check operation which sums all rows and verifies that
+ * the sum is as expected.
+ */
+public class StressTestTransactions extends HBaseClusterTestCase {
+  protected static final Log LOG = LogFactory
+      .getLog(StressTestTransactions.class);
+
+  private static final int NUM_TABLES = 3;
+  private static final int NUM_ST_ROWS = 3;
+  private static final int NUM_MT_ROWS = 3;
+  private static final int NUM_TRANSACTIONS_PER_THREAD = 100;
+  private static final int NUM_SINGLE_TABLE_THREADS = 6;
+  private static final int NUM_MULTI_TABLE_THREADS = 6;
+  private static final int PRE_COMMIT_SLEEP = 10;
+  protected static final Random RAND = new Random();
+
+  private static final byte[] FAMILY_COLON = Bytes.toBytes("family:");
+  private static final byte[] FAMILY = Bytes.toBytes("family");
+  private static final byte[] QUAL_A = Bytes.toBytes("a");
+  static final byte[] COL = Bytes.toBytes("family:a");
+
+  private HBaseAdmin admin;
+  protected TransactionalTable[] tables;
+  protected TransactionManager transactionManager;
+
+  /** constructor */
+  public StressTestTransactions() {
+    conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+        .getName());
+    conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+        .getName());
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    tables = new TransactionalTable[NUM_TABLES];
+
+    for (int i = 0; i < tables.length; i++) {
+      HTableDescriptor desc = new HTableDescriptor(makeTableName(i));
+      desc.addFamily(new HColumnDescriptor(FAMILY_COLON));
+      admin = new HBaseAdmin(conf);
+      admin.createTable(desc);
+      tables[i] = new TransactionalTable(conf, desc.getName());
+    }
+
+    transactionManager = new TransactionManager(conf);
+  }
+
+  private String makeTableName(final int i) {
+    return "table" + i;
+  }
+
+  private void writeInitalValues() throws IOException {
+    for (TransactionalTable table : tables) {
+      for (int i = 0; i < NUM_ST_ROWS; i++) {
+        table.put(new Put(makeSTRow(i)).add(FAMILY, QUAL_A, Bytes
+            .toBytes(SingleTableTransactionThread.INITIAL_VALUE)));
+      }
+      for (int i = 0; i < NUM_MT_ROWS; i++) {
+        table.put(new Put(makeMTRow(i)).add(FAMILY, QUAL_A, Bytes
+            .toBytes(MultiTableTransactionThread.INITIAL_VALUE)));
+      }
+    }
+  }
+
+  protected byte[] makeSTRow(final int i) {
+    return Bytes.toBytes("st" + i);
+  }
+
+  protected byte[] makeMTRow(final int i) {
+    return Bytes.toBytes("mt" + i);
+  }
+
+  static int nextThreadNum = 1;
+  protected static final AtomicBoolean stopRequest = new AtomicBoolean(false);
+  static final AtomicBoolean consistencyFailure = new AtomicBoolean(false);
+
+  // Thread which runs transactions
+  abstract class TransactionThread extends Thread {
+    private int numRuns = 0;
+    private int numAborts = 0;
+    private int numUnknowns = 0;
+
+    public TransactionThread(final String namePrefix) {
+      super.setName(namePrefix + "transaction " + nextThreadNum++);
+    }
+
+    @Override
+    public void run() {
+      for (int i = 0; i < NUM_TRANSACTIONS_PER_THREAD; i++) {
+        if (stopRequest.get()) {
+          return;
+        }
+        try {
+          numRuns++;
+          transaction();
+        } catch (UnknownTransactionException e) {
+          numUnknowns++;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        } catch (CommitUnsuccessfulException e) {
+          numAborts++;
+        }
+      }
+    }
+
+    protected abstract void transaction() throws IOException,
+        CommitUnsuccessfulException;
+
+    public int getNumAborts() {
+      return numAborts;
+    }
+
+    public int getNumUnknowns() {
+      return numUnknowns;
+    }
+
+    protected void preCommitSleep() {
+      try {
+        Thread.sleep(PRE_COMMIT_SLEEP);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    protected void consistencyFailure() {
+      LOG.fatal("Consistency failure");
+      stopRequest.set(true);
+      consistencyFailure.set(true);
+    }
+
+    /**
+     * Get the numRuns.
+     * 
+     * @return Return the numRuns.
+     */
+    public int getNumRuns() {
+      return numRuns;
+    }
+
+  }
+
+  // Atomically change the value of two rows rows while maintaining the sum.
+  // This should preserve the global sum of the rows, which is also checked
+  // with a transaction.
+  private class SingleTableTransactionThread extends TransactionThread {
+    private static final int INITIAL_VALUE = 10;
+    public static final int TOTAL_SUM = INITIAL_VALUE * NUM_ST_ROWS;
+    private static final int MAX_TRANSFER_AMT = 100;
+
+    private TransactionalTable table;
+    boolean doCheck = false;
+
+    public SingleTableTransactionThread() {
+      super("single table ");
+    }
+
+    @Override
+    protected void transaction() throws IOException,
+        CommitUnsuccessfulException {
+      if (doCheck) {
+        checkTotalSum();
+      } else {
+        doSingleRowChange();
+      }
+      doCheck = !doCheck;
+    }
+
+    private void doSingleRowChange() throws IOException,
+        CommitUnsuccessfulException {
+      table = tables[RAND.nextInt(NUM_TABLES)];
+      int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
+          - MAX_TRANSFER_AMT;
+      int row1Index = RAND.nextInt(NUM_ST_ROWS);
+      int row2Index;
+      do {
+        row2Index = RAND.nextInt(NUM_ST_ROWS);
+      } while (row2Index == row1Index);
+      byte[] row1 = makeSTRow(row1Index);
+      byte[] row2 = makeSTRow(row2Index);
+
+      TransactionState transactionState = transactionManager.beginTransaction();
+      int row1Amount = Bytes.toInt(table.get(transactionState,
+          new Get(row1).addColumn(COL)).getValue(COL));
+      int row2Amount = Bytes.toInt(table.get(transactionState,
+          new Get(row2).addColumn(COL)).getValue(COL));
+
+      row1Amount -= transferAmount;
+      row2Amount += transferAmount;
+
+      table.put(transactionState, new Put(row1).add(FAMILY, QUAL_A, Bytes
+          .toBytes(row1Amount)));
+      table.put(transactionState, new Put(row2).add(FAMILY, QUAL_A, Bytes
+          .toBytes(row2Amount)));
+
+      super.preCommitSleep();
+
+      transactionManager.tryCommit(transactionState);
+      LOG.debug("Commited");
+    }
+
+    // Check the table we last mutated
+    private void checkTotalSum() throws IOException,
+        CommitUnsuccessfulException {
+      TransactionState transactionState = transactionManager.beginTransaction();
+      int totalSum = 0;
+      for (int i = 0; i < NUM_ST_ROWS; i++) {
+        totalSum += Bytes.toInt(table.get(transactionState,
+            new Get(makeSTRow(i)).addColumn(COL)).getValue(COL));
+      }
+
+      transactionManager.tryCommit(transactionState);
+      if (TOTAL_SUM != totalSum) {
+        super.consistencyFailure();
+      }
+    }
+
+  }
+
+  // Similar to SingleTable, but this time we maintain consistency across tables
+  // rather than rows
+  private class MultiTableTransactionThread extends TransactionThread {
+    private static final int INITIAL_VALUE = 1000;
+    public static final int TOTAL_SUM = INITIAL_VALUE * NUM_TABLES;
+    private static final int MAX_TRANSFER_AMT = 100;
+
+    private byte[] row;
+    boolean doCheck = false;
+
+    public MultiTableTransactionThread() {
+      super("multi table");
+    }
+
+    @Override
+    protected void transaction() throws IOException,
+        CommitUnsuccessfulException {
+      if (doCheck) {
+        checkTotalSum();
+      } else {
+        doSingleRowChange();
+      }
+      doCheck = !doCheck;
+    }
+
+    private void doSingleRowChange() throws IOException,
+        CommitUnsuccessfulException {
+      row = makeMTRow(RAND.nextInt(NUM_MT_ROWS));
+      int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
+          - MAX_TRANSFER_AMT;
+      int table1Index = RAND.nextInt(tables.length);
+      int table2Index;
+      do {
+        table2Index = RAND.nextInt(tables.length);
+      } while (table2Index == table1Index);
+
+      TransactionalTable table1 = tables[table1Index];
+      TransactionalTable table2 = tables[table2Index];
+
+      TransactionState transactionState = transactionManager.beginTransaction();
+      int table1Amount = Bytes.toInt(table1.get(transactionState,
+          new Get(row).addColumn(COL)).getValue(COL));
+      int table2Amount = Bytes.toInt(table2.get(transactionState,
+          new Get(row).addColumn(COL)).getValue(COL));
+
+      table1Amount -= transferAmount;
+      table2Amount += transferAmount;
+
+      table1.put(transactionState, new Put(row).add(FAMILY, QUAL_A, Bytes
+          .toBytes(table1Amount)));
+      table2.put(transactionState, new Put(row).add(FAMILY, QUAL_A, Bytes
+          .toBytes(table2Amount)));
+
+      super.preCommitSleep();
+
+      transactionManager.tryCommit(transactionState);
+
+      LOG.trace(Bytes.toString(table1.getTableName()) + ": " + table1Amount);
+      LOG.trace(Bytes.toString(table2.getTableName()) + ": " + table2Amount);
+
+    }
+
+    private void checkTotalSum() throws IOException,
+        CommitUnsuccessfulException {
+      TransactionState transactionState = transactionManager.beginTransaction();
+      int totalSum = 0;
+      int[] amounts = new int[tables.length];
+      for (int i = 0; i < tables.length; i++) {
+        int amount = Bytes.toInt(tables[i].get(transactionState,
+            new Get(row).addColumn(COL)).getValue(COL));
+        amounts[i] = amount;
+        totalSum += amount;
+      }
+
+      transactionManager.tryCommit(transactionState);
+
+      for (int i = 0; i < tables.length; i++) {
+        LOG.trace(Bytes.toString(tables[i].getTableName()) + ": " + amounts[i]);
+      }
+
+      if (TOTAL_SUM != totalSum) {
+        super.consistencyFailure();
+      }
+    }
+
+  }
+
+  public void testStressTransactions() throws IOException, InterruptedException {
+    writeInitalValues();
+
+    List<TransactionThread> transactionThreads = new LinkedList<TransactionThread>();
+
+    for (int i = 0; i < NUM_SINGLE_TABLE_THREADS; i++) {
+      TransactionThread transactionThread = new SingleTableTransactionThread();
+      transactionThread.start();
+      transactionThreads.add(transactionThread);
+    }
+
+    for (int i = 0; i < NUM_MULTI_TABLE_THREADS; i++) {
+      TransactionThread transactionThread = new MultiTableTransactionThread();
+      transactionThread.start();
+      transactionThreads.add(transactionThread);
+    }
+
+    for (TransactionThread transactionThread : transactionThreads) {
+      transactionThread.join();
+    }
+
+    for (TransactionThread transactionThread : transactionThreads) {
+      LOG.info(transactionThread.getName() + " done with "
+          + transactionThread.getNumAborts() + " aborts, and "
+          + transactionThread.getNumUnknowns() + " unknown transactions of "
+          + transactionThread.getNumRuns());
+    }
+
+    doFinalConsistencyChecks();
+  }
+
+  private void doFinalConsistencyChecks() throws IOException {
+
+    int[] mtSums = new int[NUM_MT_ROWS];
+    for (int i = 0; i < mtSums.length; i++) {
+      mtSums[i] = 0;
+    }
+
+    for (TransactionalTable table : tables) {
+      int thisTableSum = 0;
+      for (int i = 0; i < NUM_ST_ROWS; i++) {
+        byte[] row = makeSTRow(i);
+        thisTableSum += Bytes.toInt(table.get(new Get(row).addColumn(COL))
+            .getValue(COL));
+      }
+      Assert.assertEquals(SingleTableTransactionThread.TOTAL_SUM, thisTableSum);
+
+      for (int i = 0; i < NUM_MT_ROWS; i++) {
+        byte[] row = makeMTRow(i);
+        mtSums[i] += Bytes.toInt(table.get(new Get(row).addColumn(COL))
+            .getValue(COL));
+      }
+    }
+
+    for (int mtSum : mtSums) {
+      Assert.assertEquals(MultiTableTransactionThread.TOTAL_SUM, mtSum);
+    }
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java?rev=789195&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Mon Jun 29 02:49:21 2009
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Test the transaction functionality. This requires to run an
+ * {@link TransactionalRegionServer}.
+ */
+public class TestTransactions extends HBaseClusterTestCase {
+
+  private static final String TABLE_NAME = "table1";
+
+  private static final byte[] FAMILY_COLON = Bytes.toBytes("family:");
+  private static final byte[] FAMILY = Bytes.toBytes("family");
+  private static final byte[] QUAL_A = Bytes.toBytes("a");
+  private static final byte[] COL_A = Bytes.toBytes("family:a");
+
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final byte[] ROW3 = Bytes.toBytes("row3");
+
+  private HBaseAdmin admin;
+  private TransactionalTable table;
+  private TransactionManager transactionManager;
+
+  /** constructor */
+  public TestTransactions() {
+    conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+        .getName());
+    conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+        .getName());
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(FAMILY_COLON));
+    admin = new HBaseAdmin(conf);
+    admin.createTable(desc);
+    table = new TransactionalTable(conf, desc.getName());
+
+    transactionManager = new TransactionManager(conf);
+    writeInitalRow();
+  }
+
+  private void writeInitalRow() throws IOException {
+    table.put(new Put(ROW1).add(FAMILY, QUAL_A, Bytes.toBytes(1)));
+  }
+
+  public void testSimpleTransaction() throws IOException,
+      CommitUnsuccessfulException {
+    TransactionState transactionState = makeTransaction1();
+    transactionManager.tryCommit(transactionState);
+  }
+
+  public void testTwoTransactionsWithoutConflict() throws IOException,
+      CommitUnsuccessfulException {
+    TransactionState transactionState1 = makeTransaction1();
+    TransactionState transactionState2 = makeTransaction2();
+
+    transactionManager.tryCommit(transactionState1);
+    transactionManager.tryCommit(transactionState2);
+  }
+
+  public void TestTwoTransactionsWithConflict() throws IOException,
+      CommitUnsuccessfulException {
+    TransactionState transactionState1 = makeTransaction1();
+    TransactionState transactionState2 = makeTransaction2();
+
+    transactionManager.tryCommit(transactionState2);
+
+    try {
+      transactionManager.tryCommit(transactionState1);
+      fail();
+    } catch (CommitUnsuccessfulException e) {
+      // Good
+    }
+  }
+
+  // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
+  private TransactionState makeTransaction1() throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A));
+
+    table.put(new Put(ROW2).add(FAMILY, QUAL_A, row1_A.getValue(COL_A)));
+    table.put(new Put(ROW3).add(FAMILY, QUAL_A, row1_A.getValue(COL_A)));
+
+    return transactionState;
+  }
+
+  // Read ROW1,COL_A, increment its (integer) value, write back
+  private TransactionState makeTransaction2() throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A));
+
+    int value = Bytes.toInt(row1_A.getValue(COL_A));
+
+    table.put(new Put(ROW1).add(FAMILY, QUAL_A, Bytes.toBytes(value + 1)));
+
+    return transactionState;
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestHLogRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestHLogRecovery.java?rev=789195&view=auto
==============================================================================
    (empty)

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java?rev=789195&view=auto
==============================================================================
    (empty)

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java?rev=789195&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java Mon Jun 29 02:49:21 2009
@@ -0,0 +1,284 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+/** JUnit test case for HLog */
+public class TestTHLog extends HBaseTestCase implements
+    HConstants {
+  private Path dir;
+  private MiniDFSCluster cluster;
+
+  final byte[] tableName = Bytes.toBytes("tablename");
+  final HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+  final HRegionInfo regionInfo = new HRegionInfo(tableDesc,
+      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+  final byte[] row1 = Bytes.toBytes("row1");
+  final byte[] val1 = Bytes.toBytes("val1");
+  final byte[] row2 = Bytes.toBytes("row2");
+  final byte[] val2 = Bytes.toBytes("val2");
+  final byte[] row3 = Bytes.toBytes("row3");
+  final byte[] val3 = Bytes.toBytes("val3");
+  final byte[] family = Bytes.toBytes("family");
+  final byte[] column = Bytes.toBytes("a");
+
+  @Override
+  public void setUp() throws Exception {
+    cluster = new MiniDFSCluster(conf, 2, true, (String[]) null);
+    // Set the hbase.rootdir to be the home directory in mini dfs.
+    this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem()
+        .getHomeDirectory().toString());
+    super.setUp();
+    this.dir = new Path("/hbase", getName());
+    if (fs.exists(dir)) {
+      fs.delete(dir, true);
+    }
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    if (this.fs.exists(this.dir)) {
+      this.fs.delete(this.dir, true);
+    }
+    shutdownDfs(cluster);
+    super.tearDown();
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void testSingleCommit() throws IOException {
+
+    THLog log = new THLog(fs, dir, this.conf, null);
+    THLogRecoveryManager logRecoveryMangaer = new THLogRecoveryManager(fs,
+        regionInfo, conf);
+
+    // Write columns named 1, 2, 3, etc. and then values of single byte
+    // 1, 2, 3...
+    long transactionId = 1;
+    log.writeStartToLog(regionInfo, transactionId);
+
+    log.writeUpdateToLog(regionInfo, transactionId, new Put(row1).add(family,
+        column, val1));
+    log.writeUpdateToLog(regionInfo, transactionId, new Put(row2).add(family,
+        column, val2));
+    log.writeUpdateToLog(regionInfo, transactionId, new Put(row3).add(family,
+        column, val3));
+
+    log.writeCommitToLog(regionInfo, transactionId);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
+        filename, -1, null);
+
+    assertEquals(1, commits.size());
+    assertTrue(commits.containsKey(transactionId));
+    assertEquals(3, commits.get(transactionId).size());
+
+    List<KeyValue> updates = commits.get(transactionId);
+
+    KeyValue update1 = updates.get(0);
+    assertTrue(Bytes.equals(row1, update1.getRow()));
+    assertTrue(Bytes.equals(val1, update1.getValue()));
+
+    KeyValue update2 = updates.get(1);
+    assertTrue(Bytes.equals(row2, update2.getRow()));
+    assertTrue(Bytes.equals(val2, update2.getValue()));
+
+    KeyValue update3 = updates.get(2);
+    assertTrue(Bytes.equals(row3, update3.getRow()));
+    assertTrue(Bytes.equals(val3, update3.getValue()));
+
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void testSingleAbort() throws IOException {
+
+    THLog log = new THLog(fs, dir, this.conf, null);
+    THLogRecoveryManager logRecoveryMangaer = new THLogRecoveryManager(fs,
+        regionInfo, conf);
+
+    long transactionId = 1;
+    log.writeStartToLog(regionInfo, transactionId);
+
+    log.writeUpdateToLog(regionInfo, transactionId, new Put(row1).add(family,
+        column, val1));
+    log.writeUpdateToLog(regionInfo, transactionId, new Put(row2).add(family,
+        column, val2));
+    log.writeUpdateToLog(regionInfo, transactionId, new Put(row3).add(family,
+        column, val3));
+
+    log.writeAbortToLog(regionInfo, transactionId);
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
+        filename, -1, null);
+
+    assertEquals(0, commits.size());
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void testInterlievedCommits() throws IOException {
+
+    THLog log = new THLog(fs, dir, this.conf, null);
+    THLogRecoveryManager logMangaer = new THLogRecoveryManager(fs, regionInfo,
+        conf);
+
+    long transaction1Id = 1;
+    long transaction2Id = 2;
+
+    log.writeStartToLog(regionInfo, transaction1Id);
+    log.writeUpdateToLog(regionInfo, transaction1Id, new Put(row1).add(family,
+        column, val1));
+
+    log.writeStartToLog(regionInfo, transaction2Id);
+    log.writeUpdateToLog(regionInfo, transaction2Id, new Put(row2).add(family,
+        column, val2));
+
+    log.writeUpdateToLog(regionInfo, transaction1Id, new Put(row3).add(family,
+        column, val3));
+
+    log.writeCommitToLog(regionInfo, transaction1Id);
+    log.writeCommitToLog(regionInfo, transaction2Id);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
+        -1, null);
+
+    assertEquals(2, commits.size());
+    assertEquals(2, commits.get(transaction1Id).size());
+    assertEquals(1, commits.get(transaction2Id).size());
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void testInterlievedAbortCommit() throws IOException {
+
+    THLog log = new THLog(fs, dir, this.conf, null);
+    THLogRecoveryManager logMangaer = new THLogRecoveryManager(fs, regionInfo,
+        conf);
+
+    long transaction1Id = 1;
+    long transaction2Id = 2;
+
+    log.writeStartToLog(regionInfo, transaction1Id);
+    log.writeUpdateToLog(regionInfo, transaction1Id, new Put(row1).add(family,
+        column, val1));
+
+    log.writeStartToLog(regionInfo, transaction2Id);
+    log.writeUpdateToLog(regionInfo, transaction2Id, new Put(row2).add(family,
+        column, val2));
+    log.writeAbortToLog(regionInfo, transaction2Id);
+
+    log.writeUpdateToLog(regionInfo, transaction1Id, new Put(row3).add(family,
+        column, val3));
+
+    log.writeCommitToLog(regionInfo, transaction1Id);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
+        -1, null);
+
+    assertEquals(1, commits.size());
+    assertEquals(2, commits.get(transaction1Id).size());
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void testInterlievedCommitAbort() throws IOException {
+
+    THLog log = new THLog(fs, dir, this.conf, null);
+    THLogRecoveryManager logMangaer = new THLogRecoveryManager(fs, regionInfo,
+        conf);
+
+    long transaction1Id = 1;
+    long transaction2Id = 2;
+
+    log.writeStartToLog(regionInfo, transaction1Id);
+    log.writeUpdateToLog(regionInfo, transaction1Id, new Put(row1).add(family,
+        column, val1));
+
+    log.writeStartToLog(regionInfo, transaction2Id);
+    log.writeUpdateToLog(regionInfo, transaction2Id, new Put(row2).add(family,
+        column, val2));
+    log.writeCommitToLog(regionInfo, transaction2Id);
+
+    log.writeUpdateToLog(regionInfo, transaction1Id, new Put(row3).add(family,
+        column, val3));
+
+    log.writeAbortToLog(regionInfo, transaction1Id);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
+        -1, null);
+
+    assertEquals(1, commits.size());
+    assertEquals(1, commits.get(transaction2Id).size());
+  }
+
+  // FIXME Cannot do this test without a global transacton manager
+  // public void testMissingCommit() {
+  // fail();
+  // }
+
+  // FIXME Cannot do this test without a global transacton manager
+  // public void testMissingAbort() {
+  // fail();
+  // }
+
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLogRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLogRecovery.java?rev=789195&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLogRecovery.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLogRecovery.java Mon Jun 29 02:49:21 2009
@@ -0,0 +1,290 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestTHLogRecovery extends HBaseClusterTestCase {
+  private static final Log LOG = LogFactory.getLog(TestTHLogRecovery.class);
+
+  private static final String TABLE_NAME = "table1";
+
+  private static final byte[] FAMILY_COLON = Bytes.toBytes("family:");
+  private static final byte[] FAMILY = Bytes.toBytes("family");
+  private static final byte[] QUAL_A = Bytes.toBytes("a");
+  private static final byte[] COL_A = Bytes.toBytes("family:a");
+
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final byte[] ROW3 = Bytes.toBytes("row3");
+  private static final int TOTAL_VALUE = 10;
+
+  private HBaseAdmin admin;
+  private TransactionManager transactionManager;
+  private TransactionalTable table;
+
+  /** constructor */
+  public TestTHLogRecovery() {
+    super(2, false);
+
+    conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+        .getName());
+    conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+        .getName());
+
+    // Set flush params so we don't get any
+    // FIXME (defaults are probably fine)
+
+    // Copied from TestRegionServerExit
+    conf.setInt("ipc.client.connect.max.retries", 5); // reduce ipc retries
+    conf.setInt("ipc.client.timeout", 10000); // and ipc timeout
+    conf.setInt("hbase.client.pause", 10000); // increase client timeout
+    conf.setInt("hbase.client.retries.number", 10); // increase HBase retries
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    FileSystem.getLocal(conf).delete(new Path(conf.get(HConstants.HBASE_DIR)),
+        true);
+    super.setUp();
+
+    HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(FAMILY_COLON));
+    admin = new HBaseAdmin(conf);
+    admin.createTable(desc);
+    table = new TransactionalTable(conf, desc.getName());
+    HBaseBackedTransactionLogger.createTable();
+
+    transactionManager = new TransactionManager(
+        new HBaseBackedTransactionLogger(), conf);
+    writeInitalRows();
+  }
+
+  private void writeInitalRows() throws IOException {
+
+    table.put(new Put(ROW1).add(FAMILY, QUAL_A, Bytes.toBytes(TOTAL_VALUE)));
+    table.put(new Put(ROW2).add(FAMILY, QUAL_A, Bytes.toBytes(0)));
+    table.put(new Put(ROW3).add(FAMILY, QUAL_A, Bytes.toBytes(0)));
+  }
+
+  public void testWithoutFlush() throws IOException,
+      CommitUnsuccessfulException {
+    writeInitalRows();
+    TransactionState state1 = makeTransaction(false);
+    transactionManager.tryCommit(state1);
+    stopOrAbortRegionServer(true);
+
+    Thread t = startVerificationThread(1);
+    t.start();
+    threadDumpingJoin(t);
+  }
+
+  public void testWithFlushBeforeCommit() throws IOException,
+      CommitUnsuccessfulException {
+    writeInitalRows();
+    TransactionState state1 = makeTransaction(false);
+    flushRegionServer();
+    transactionManager.tryCommit(state1);
+    stopOrAbortRegionServer(true);
+
+    Thread t = startVerificationThread(1);
+    t.start();
+    threadDumpingJoin(t);
+  }
+
+  // FIXME, TODO
+  // public void testWithFlushBetweenTransactionWrites() {
+  // fail();
+  // }
+
+  private void flushRegionServer() {
+    List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
+        .getRegionThreads();
+
+    HRegion region = null;
+    int server = -1;
+    for (int i = 0; i < regionThreads.size() && server == -1; i++) {
+      HRegionServer s = regionThreads.get(i).getRegionServer();
+      Collection<HRegion> regions = s.getOnlineRegions();
+      for (HRegion r : regions) {
+        if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
+          server = i;
+          region = r;
+        }
+      }
+    }
+    if (server == -1) {
+      LOG.fatal("could not find region server serving table region");
+      fail();
+    }
+    ((TransactionalRegionServer) regionThreads.get(server).getRegionServer())
+        .getFlushRequester().request(region);
+  }
+
+  /**
+   * Stop the region server serving TABLE_NAME.
+   * 
+   * @param abort set to true if region server should be aborted, if false it is
+   * just shut down.
+   */
+  private void stopOrAbortRegionServer(final boolean abort) {
+    List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
+        .getRegionThreads();
+
+    int server = -1;
+    for (int i = 0; i < regionThreads.size(); i++) {
+      HRegionServer s = regionThreads.get(i).getRegionServer();
+      Collection<HRegion> regions = s.getOnlineRegions();
+      LOG.info("server: " + regionThreads.get(i).getName());
+      for (HRegion r : regions) {
+        LOG.info("region: " + r.getRegionInfo().getRegionNameAsString());
+        if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
+          server = i;
+        }
+      }
+    }
+    if (server == -1) {
+      LOG.fatal("could not find region server serving table region");
+      fail();
+    }
+    if (abort) {
+      this.cluster.abortRegionServer(server);
+
+    } else {
+      this.cluster.stopRegionServer(server, false);
+    }
+    LOG.info(this.cluster.waitOnRegionServer(server) + " has been "
+        + (abort ? "aborted" : "shut down"));
+  }
+
+  private void verify(final int numRuns) throws IOException {
+    // Reads
+    int row1 = Bytes.toInt(table.get(new Get(ROW1).addColumn(COL_A)).getValue(
+        COL_A));
+    int row2 = Bytes.toInt(table.get(new Get(ROW2).addColumn(COL_A)).getValue(
+        COL_A));
+    int row3 = Bytes.toInt(table.get(new Get(ROW3).addColumn(COL_A)).getValue(
+        COL_A));
+
+    assertEquals(TOTAL_VALUE - 2 * numRuns, row1);
+    assertEquals(numRuns, row2);
+    assertEquals(numRuns, row3);
+  }
+
+  // Move 2 out of ROW1 and 1 into ROW2 and 1 into ROW3
+  private TransactionState makeTransaction(final boolean flushMidWay)
+      throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    // Reads
+    int row1 = Bytes.toInt(table.get(transactionState,
+        new Get(ROW1).addColumn(COL_A)).getValue(COL_A));
+    int row2 = Bytes.toInt(table.get(transactionState,
+        new Get(ROW2).addColumn(COL_A)).getValue(COL_A));
+    int row3 = Bytes.toInt(table.get(transactionState,
+        new Get(ROW3).addColumn(COL_A)).getValue(COL_A));
+
+    row1 -= 2;
+    row2 += 1;
+    row3 += 1;
+
+    if (flushMidWay) {
+      flushRegionServer();
+    }
+
+    // Writes
+    Put write = new Put(ROW1);
+    write.add(FAMILY, QUAL_A, Bytes.toBytes(row1));
+    table.put(transactionState, write);
+
+    write = new Put(ROW2);
+    write.add(FAMILY, QUAL_A, Bytes.toBytes(row2));
+    table.put(transactionState, write);
+
+    write = new Put(ROW3);
+    write.add(FAMILY, QUAL_A, Bytes.toBytes(row3));
+    table.put(transactionState, write);
+
+    return transactionState;
+  }
+
+  /*
+   * Run verification in a thread so I can concurrently run a thread-dumper
+   * while we're waiting (because in this test sometimes the meta scanner looks
+   * to be be stuck). @param tableName Name of table to find. @param row Row we
+   * expect to find. @return Verification thread. Caller needs to calls start on
+   * it.
+   */
+  private Thread startVerificationThread(final int numRuns) {
+    Runnable runnable = new Runnable() {
+      public void run() {
+        try {
+          // Now try to open a scanner on the meta table. Should stall until
+          // meta server comes back up.
+          HTable t = new HTable(conf, TABLE_NAME);
+          Scan s = new Scan();
+          s.addColumn(FAMILY, QUAL_A);
+          ResultScanner scanner = t.getScanner(s);
+          scanner.close();
+
+        } catch (IOException e) {
+          LOG.fatal("could not re-open meta table because", e);
+          fail();
+        }
+
+        try {
+          verify(numRuns);
+          LOG.info("Success!");
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail();
+        }
+      }
+    };
+    return new Thread(runnable);
+  }
+}



Mime
View raw message