hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r925047 - in /hadoop/hbase/branches/0.20: ./ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/ src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/ src/java/org/apache/h...
Date Thu, 18 Mar 2010 23:51:49 GMT
Author: stack
Date: Thu Mar 18 23:51:49 2010
New Revision: 925047

URL: http://svn.apache.org/viewvc?rev=925047&view=rev
Log:
HBASE-2283 row level atomicity

Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Thu Mar 18 23:51:49 2010
@@ -45,6 +45,7 @@ Release 0.20.4 - Unreleased
    HBASE-2305  Client port for ZK has no default (Suraj Varma via Stack)
    HBASE-2323  filter.RegexStringComparator does not work with certain bytes
                (Benoit Sigoure via Stack)
+   HBASE-2283  row level atomicity (Kannan Muthukkaruppan via Stack)
 
   IMPROVEMENTS
    HBASE-2180  Bad read performance from synchronizing hfile.fddatainputstream

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java Thu Mar 18 23:51:49 2010
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.regionserver.HLog;
 import org.apache.hadoop.hbase.regionserver.HLogKey;
 import org.apache.hadoop.hbase.regionserver.LogRollListener;
+import org.apache.hadoop.hbase.regionserver.WALEdit;
 import org.apache.hadoop.io.SequenceFile;
 
 /**
@@ -49,7 +50,7 @@ class THLog extends HLog {
 
   @Override
   protected SequenceFile.Writer createWriter(Path path) throws IOException {
-    return super.createWriter(path, THLogKey.class, KeyValue.class);
+    return super.createWriter(path, THLogKey.class, WALEdit.class);
   }
 
   @Override
@@ -90,16 +91,17 @@ class THLog extends HLog {
    */
   public void append(HRegionInfo regionInfo, long now, THLogKey.TrxOp txOp,
       long transactionId) throws IOException {
-    THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo
-        .getTableDesc().getName(), -1, now, txOp, transactionId);
-    super.append(regionInfo, key, new KeyValue(new byte [0], 0, 0)); // Empty KeyValue 
+    THLogKey key = new THLogKey(regionInfo.getRegionName(),
+      regionInfo.getTableDesc().getName(), -1, now, txOp, transactionId);
+    WALEdit e = new WALEdit();
+    e.add(new KeyValue(new byte [0], 0, 0)); // Empty KeyValue
+    super.append(regionInfo, key, e, regionInfo.isMetaRegion());
   }
 
   /**
    * Write a transactional update to the log.
    * 
    * @param regionInfo
-   * @param now
    * @param update
    * @param transactionId
    * @throws IOException
@@ -114,7 +116,9 @@ class THLog extends HLog {
         transactionId);
 
     for (KeyValue value : convertToKeyValues(update)) {
-      super.append(regionInfo, key, value);
+      WALEdit e = new WALEdit();
+      e.add(value);
+      super.append(regionInfo, key, e, regionInfo.isMetaRegion());
     }
   }
 
@@ -122,8 +126,7 @@ class THLog extends HLog {
    * Write a transactional delete to the log.
    * 
    * @param regionInfo
-   * @param now
-   * @param update
+   * @param delete
    * @param transactionId
    * @throws IOException
    */
@@ -137,7 +140,9 @@ class THLog extends HLog {
         transactionId);
 
     for (KeyValue value : convertToKeyValues(delete)) {
-      super.append(regionInfo, key, value);
+      WALEdit e = new WALEdit();
+      e.add(value);
+      super.append(regionInfo, key, e, regionInfo.isMetaRegion());
     }
   }
 

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java Thu Mar 18 23:51:49 2010
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
 import org.apache.hadoop.hbase.client.transactional.TransactionLogger;
+import org.apache.hadoop.hbase.regionserver.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.Progressable;
@@ -86,7 +87,7 @@ class THLogRecoveryManager {
    * @throws UnsupportedEncodingException
    * @throws IOException
    */
-  public Map<Long, List<KeyValue>> getCommitsFromLog(
+  public Map<Long, List<WALEdit>> getCommitsFromLog(
       final Path reconstructionLog, final long maxSeqID,
       final Progressable reporter) throws UnsupportedEncodingException,
       IOException {
@@ -102,7 +103,8 @@ class THLogRecoveryManager {
       return null;
     }
 
-    SortedMap<Long, List<KeyValue>> pendingTransactionsById = new TreeMap<Long, List<KeyValue>>();
+    SortedMap<Long, List<WALEdit>> pendingTransactionsById =
+        new TreeMap<Long, List<WALEdit>>();
     Set<Long> commitedTransactions = new HashSet<Long>();
     Set<Long> abortedTransactions = new HashSet<Long>();
 
@@ -111,7 +113,7 @@ class THLogRecoveryManager {
     
       try {
       THLogKey key = new THLogKey();
-      KeyValue val = new KeyValue();
+      WALEdit val = new WALEdit();
       long skippedEdits = 0;
       long totalEdits = 0;
       long startCount = 0;
@@ -119,6 +121,9 @@ class THLogRecoveryManager {
       long abortCount = 0;
       long commitCount = 0;
       // How many edits to apply before we send a progress report.
+
+
+
       int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
           2000);
 
@@ -137,18 +142,18 @@ class THLogRecoveryManager {
         }
         long transactionId = key.getTransactionId();
 
-        List<KeyValue> updates = pendingTransactionsById.get(transactionId);
+        List<WALEdit> updates = pendingTransactionsById.get(transactionId);
         switch (key.getTrxOp()) {
 
         case OP:
           if (updates == null) {
-              updates = new ArrayList<KeyValue>();
+              updates = new ArrayList<WALEdit>();
               pendingTransactionsById.put(transactionId, updates);
               startCount++;
           }
 
           updates.add(val);
-          val = new KeyValue();
+          val = new WALEdit();
           writeCount++;
           break;
 
@@ -210,15 +215,16 @@ class THLogRecoveryManager {
     return null;
   }
   
-  private SortedMap<Long, List<KeyValue>> resolvePendingTransaction(
-      SortedMap<Long, List<KeyValue>> pendingTransactionsById
+  private SortedMap<Long, List<WALEdit>> resolvePendingTransaction(
+      SortedMap<Long, List<WALEdit>> pendingTransactionsById
       ) {
-    SortedMap<Long, List<KeyValue>> commitedTransactionsById = new TreeMap<Long, List<KeyValue>>();
+    SortedMap<Long, List<WALEdit>> commitedTransactionsById =
+      new TreeMap<Long, List<WALEdit>>();
     
     LOG.info("Region log has " + pendingTransactionsById.size()
         + " unfinished transactions. Going to the transaction log to resolve");
 
-    for (Entry<Long, List<KeyValue>> entry : pendingTransactionsById.entrySet()) {
+    for (Entry<Long, List<WALEdit>> entry : pendingTransactionsById.entrySet()) {
       if (entry.getValue().isEmpty()) {
         LOG.debug("Skipping resolving trx ["+entry.getKey()+"] has no writes.");
       }

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Thu Mar 18 23:51:49 2010
@@ -54,12 +54,7 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
 import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
 import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
-import org.apache.hadoop.hbase.regionserver.FlushRequester;
-import org.apache.hadoop.hbase.regionserver.HLog;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.*;
 import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
 import org.apache.hadoop.util.Progressable;
 
@@ -142,20 +137,23 @@ public class TransactionalRegion extends
     }
         
     THLogRecoveryManager recoveryManager = new THLogRecoveryManager(this);
-    Map<Long, List<KeyValue>> commitedTransactionsById = recoveryManager
+    Map<Long, List<WALEdit>> commitedTransactionsById = recoveryManager
         .getCommitsFromLog(oldLogFile, minSeqId, reporter);
 
     if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
       LOG.debug("found " + commitedTransactionsById.size()
           + " COMMITED transactions to recover.");
 
-      for (Entry<Long, List<KeyValue>> entry : commitedTransactionsById
+      for (Entry<Long, List<WALEdit>> entry : commitedTransactionsById
           .entrySet()) {
         LOG.debug("Writing " + entry.getValue().size()
             + " updates for transaction " + entry.getKey());
-        for (KeyValue b : entry.getValue()) {
-          Put put = new Put(b.getRow());
-          put.add(b);
+        for (WALEdit b : entry.getValue()) {
+          Put put = null;
+          for (KeyValue kv: b.getKeyValues()) {
+            if (put == null) put = new Put(kv.getRow());
+            put.add(kv);
+          }
           super.put(put, true); // These are walled so they live forever
         }
       }

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java Thu Mar 18 23:51:49 2010
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
@@ -100,7 +101,7 @@ public class TestTHLog extends HBaseTest
     log.close();
     Path filename = log.computeFilename(log.getFilenum());
 
-    Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
+    Map<Long, List<WALEdit>> commits = logRecoveryMangaer.getCommitsFromLog(
         filename, -1, null);
 
     assertNull(commits);
@@ -130,7 +131,7 @@ public class TestTHLog extends HBaseTest
     log.close();
     Path filename = log.computeFilename(log.getFilenum());
 
-    Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
+    Map<Long, List<WALEdit>> commits = logRecoveryMangaer.getCommitsFromLog(
         filename, -1, null);
 
     assertNull(commits);
@@ -165,7 +166,7 @@ public class TestTHLog extends HBaseTest
     log.close();
     Path filename = log.computeFilename(log.getFilenum());
 
-    Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
+    Map<Long, List<WALEdit>> commits = logMangaer.getCommitsFromLog(filename,
         -1, null);
 
     assertNull(commits);
@@ -200,7 +201,7 @@ public class TestTHLog extends HBaseTest
     log.close();
     Path filename = log.computeFilename(log.getFilenum());
 
-    Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
+    Map<Long, List<WALEdit>> commits = logMangaer.getCommitsFromLog(filename,
         -1, null);
 
     assertNull(commits);
@@ -235,7 +236,7 @@ public class TestTHLog extends HBaseTest
     log.close();
     Path filename = log.computeFilename(log.getFilenum());
 
-    Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
+    Map<Long, List<WALEdit>> commits = logMangaer.getCommitsFromLog(filename,
         -1, null);
 
     assertNull(commits);

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java Thu Mar 18 23:51:49 2010
@@ -1794,14 +1794,22 @@ public class KeyValue implements Writabl
         (2 * Bytes.SIZEOF_INT));
   }
   
-  // Writable
-  public void readFields(final DataInput in) throws IOException {
-    this.length = in.readInt();
+  // this overload assumes that the length bytes have already been read,
+  // and it expects the length of the KeyValue to be explicitly passed
+  // to it.
+  public void readFields(int length, final DataInput in) throws IOException {
+    this.length = length;
     this.offset = 0;
     this.bytes = new byte[this.length];
     in.readFully(this.bytes, 0, this.length);
   }
   
+  // Writable
+  public void readFields(final DataInput in) throws IOException {
+    int length = in.readInt();
+    readFields(length, in);
+  }
+  
   public void write(final DataOutput out) throws IOException {
     out.writeInt(this.length);
     out.write(this.bytes, this.offset, this.length);

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Thu Mar 18 23:51:49 2010
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Metadata;
@@ -439,7 +440,7 @@ public class HLog implements HConstants,
   }
 
   protected SequenceFile.Writer createWriter(Path path) throws IOException {
-    return createWriter(path, HLogKey.class, KeyValue.class);
+    return createWriter(path, HLogKey.class, WALEdit.class);
   }
   
   // usage: see TestLogRolling.java
@@ -448,7 +449,7 @@ public class HLog implements HConstants,
   }
 
   protected SequenceFile.Writer createWriter(Path path,
-      Class<? extends HLogKey> keyClass, Class<? extends KeyValue> valueClass)
+      Class<? extends HLogKey> keyClass, Class<? extends WALEdit> valueClass)
       throws IOException {
     return SequenceFile.createWriter(this.fs, this.conf, path, keyClass,
         valueClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs
@@ -637,12 +638,13 @@ public class HLog implements HConstants,
    * @param now Time of this edit write.
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, KeyValue logEdit,
-    final long now)
+  public void append(HRegionInfo regionInfo, WALEdit logEdit,
+    final long now,
+    final boolean isMetaRegion)
   throws IOException {
     byte [] regionName = regionInfo.getRegionName();
     byte [] tableName = regionInfo.getTableDesc().getName();
-    this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
+    this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit, isMetaRegion);
   }
 
   /**
@@ -664,7 +666,8 @@ public class HLog implements HConstants,
    * @param logKey
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, HLogKey logKey, KeyValue logEdit)
+  public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
+                     final boolean isMetaRegion)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
@@ -683,6 +686,10 @@ public class HLog implements HConstants,
       this.unflushedEntries.incrementAndGet();
       this.numEntries.incrementAndGet();
     }
+
+    // sync txn to file system
+    this.sync(isMetaRegion);
+
     if (this.editsSize.get() > this.logrollsize) {
       if (listener != null) {
         listener.logRollRequested();
@@ -714,29 +721,34 @@ public class HLog implements HConstants,
    * @param now
    * @throws IOException
    */
-  public void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
-    final long now)
+  public void append(byte [] regionName, byte [] tableName, WALEdit edits,
+    final long now, final boolean isMetaRegion)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    long seqNum [] = obtainSeqNum(edits.size());
+    long seqNum = obtainSeqNum();
+
     synchronized (this.updateLock) {
       // The 'lastSeqWritten' map holds the sequence number of the oldest
       // write for each region (i.e. the first edit added to the particular
       // memstore). . When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
+      this.lastSeqWritten.putIfAbsent(regionName, seqNum);
       int counter = 0;
-      for (KeyValue kv: edits) {
-        HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now);
-        doWrite(logKey, kv);
-        this.numEntries.incrementAndGet();
-      }
+
+      HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
+      doWrite(logKey, edits);
+      this.numEntries.incrementAndGet();
+
       // Only count 1 row as an unflushed entry
       this.unflushedEntries.incrementAndGet();
     }
+
+    // sync txn to file system
+    this.sync(isMetaRegion);
+
     if (this.editsSize.get() > this.logrollsize) {
         requestLogRoll();
     }
@@ -920,7 +932,7 @@ public class HLog implements HConstants,
     }
   }
   
-  private void doWrite(HLogKey logKey, KeyValue logEdit)
+  private void doWrite(HLogKey logKey, WALEdit logEdit)
   throws IOException {
     if (!this.enabled) {
       return;
@@ -1002,7 +1014,8 @@ public class HLog implements HConstants,
    * @throws IOException
    */
   public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
-    final long logSeqId)
+    final long logSeqId,
+    final boolean isMetaRegion)
   throws IOException {
     try {
       if (this.closed) {
@@ -1010,8 +1023,11 @@ public class HLog implements HConstants,
       }
       synchronized (updateLock) {
         long now = System.currentTimeMillis();
+        KeyValue edit = completeCacheFlushLogEdit();
+        WALEdit edits = new WALEdit();
+        edits.add(edit);
         this.writer.append(makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()), 
-            completeCacheFlushLogEdit());
+          edits);
         writeTime += System.currentTimeMillis() - now;
         writeOps++;
         this.numEntries.incrementAndGet();
@@ -1020,6 +1036,9 @@ public class HLog implements HConstants,
           this.lastSeqWritten.remove(regionName);
         }
       }
+      // sync txn to file system
+      this.sync(isMetaRegion);
+
     } finally {
       this.cacheFlushLock.unlock();
     }
@@ -1170,7 +1189,7 @@ public class HLog implements HConstants,
             in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
             try {
               HLogKey key = newKey(conf);
-              KeyValue val = new KeyValue();
+              WALEdit val = new WALEdit();
               while (in.next(key, val)) {
                 byte [] regionName = key.getRegionName();
                 LinkedList<HLogEntry> queue = logEntries.get(regionName);
@@ -1185,7 +1204,7 @@ public class HLog implements HConstants,
                 // Make the key and value new each time; otherwise same instance
                 // is used over and over.
                 key = newKey(conf);
-                val = new KeyValue();
+                val = new WALEdit();
               }
               LOG.debug("Pushed=" + count + " entries from " +
                 logfiles[i].getPath());
@@ -1254,7 +1273,7 @@ public class HLog implements HConstants,
                     }
                     SequenceFile.Writer w =
                       SequenceFile.createWriter(fs, conf, logfile,
-                        getKeyClass(conf), KeyValue.class, getCompressionType(conf));
+                        getKeyClass(conf), WALEdit.class, getCompressionType(conf));
                     wap = new WriterAndPath(logfile, w);
                     logWriters.put(key, wap);
                     if (LOG.isDebugEnabled()) {
@@ -1265,13 +1284,15 @@ public class HLog implements HConstants,
                     if (old != null) {
                       // Copy from existing log file
                       HLogKey oldkey = newKey(conf);
-                      KeyValue oldval = new KeyValue();
+                      WALEdit oldval = new WALEdit();
                       for (; old.next(oldkey, oldval); count++) {
                         if (LOG.isDebugEnabled() && count > 0
                             && count % 10000 == 0) {
                           LOG.debug("Copied " + count + " edits");
                         }
                         w.append(oldkey, oldval);
+                        oldkey = newKey(conf);
+                        oldval = new WALEdit();
                       }
                       old.close();
                       fs.delete(oldlogfile, true);
@@ -1339,14 +1360,14 @@ public class HLog implements HConstants,
    * Only used when splitting logs
    */
   public static class HLogEntry {
-    private KeyValue edit;
+    private WALEdit edit;
     private HLogKey key;
     /**
      * Constructor for both params
      * @param edit log's edit
      * @param key log's key
      */
-    public HLogEntry(KeyValue edit, HLogKey key) {
+    public HLogEntry(WALEdit edit, HLogKey key) {
       super();
       this.edit = edit;
       this.key = key;
@@ -1355,7 +1376,7 @@ public class HLog implements HConstants,
      * Gets the edit
      * @return edit
      */
-    public KeyValue getEdit() {
+    public WALEdit getEdit() {
       return edit;
     }
     /**
@@ -1387,6 +1408,13 @@ public class HLog implements HConstants,
     if (!append) {
       return;
     }
+
+    // lease recovery not needed for local file system case.
+    // currently, local file system doesn't implement append either.
+    if (!(fs instanceof DistributedFileSystem)) {
+      return;
+    }
+
     // Trying recovery
     boolean recovered = false;
     while (!recovered) {

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Mar 18 23:51:49 2010
@@ -57,6 +57,7 @@ package org.apache.hadoop.hbase.regionse
  import java.util.AbstractList;
  import java.util.ArrayList;
  import java.util.Collection;
+ import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
@@ -69,7 +70,7 @@ package org.apache.hadoop.hbase.regionse
  import java.util.concurrent.ConcurrentSkipListMap;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicLong;
- import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
  /**
  * HRegion stores data for a certain region of a table.  It stores all columns
@@ -1015,7 +1016,8 @@ public class HRegion implements HConstan
     //     and that all updates to the log for this regionName that have lower 
     //     log-sequence-ids can be safely ignored.
     this.log.completeCacheFlush(getRegionName(),
-        regionInfo.getTableDesc().getName(), completeSequenceId);
+        regionInfo.getTableDesc().getName(), completeSequenceId,
+        this.getRegionInfo().isMetaRegion());
 
     // C. Finally notify anyone waiting on memstore to clear:
     // e.g. checkResources().
@@ -1184,11 +1186,10 @@ public class HRegion implements HConstan
           checkFamily(family);
         }
       }
-      
-      for(Map.Entry<byte[], List<KeyValue>> e: delete.getFamilyMap().entrySet()) {
-        byte [] family = e.getKey();
-        delete(family, e.getValue(), writeToWAL);
-      }
+
+      // All edits for the given row (across all column families) must happen atomically.
+      delete(delete.getFamilyMap(), writeToWAL);
+
     } finally {
       if(lockid == null) releaseRowLock(lid);
       splitsAndClosesLock.readLock().unlock();
@@ -1198,12 +1199,11 @@ public class HRegion implements HConstan
   
   
   /**
-   * @param family
-   * @param kvs
+   * @param familyMap map of family to edits for the given family.
    * @param writeToWAL
    * @throws IOException
    */
-  public void delete(byte [] family, List<KeyValue> kvs, boolean writeToWAL)
+  public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
   throws IOException {
     long now = System.currentTimeMillis();
     byte [] byteNow = Bytes.toBytes(now);
@@ -1212,38 +1212,70 @@ public class HRegion implements HConstan
 
     try {
       if (writeToWAL) {
-        this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), kvs, now);
+        //
+        // write/sync to WAL should happen before we touch memstore.
+        //
+        // If order is reversed, i.e. we write to memstore first, and
+        // for some reason fail to write/sync to commit log, the memstore
+        // will contain uncommitted transactions.
+        //
+
+        // bunch up all edits across all column families into a
+        // single WALEdit.
+        WALEdit walEdit = new WALEdit();
+        for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+          List<KeyValue> kvs = e.getValue();
+          for (KeyValue kv : kvs) {
+            walEdit.add(kv);
+          }
+        }
+        // append the edit to WAL. The append also does the sync.
+        if (!walEdit.isEmpty()) {
+          this.log.append(regionInfo.getRegionName(),
+                          regionInfo.getTableDesc().getName(),
+                          walEdit, now,
+                          this.getRegionInfo().isMetaRegion());
+        }
       }
+
       long size = 0;
-      Store store = getStore(family);
-      for (KeyValue kv: kvs) {
-        // Check if time is LATEST, change to time of most recent addition if so
-        // This is expensive.
-        if (kv.isLatestTimestamp() && kv.isDeleteType()) {
-          List<KeyValue> result = new ArrayList<KeyValue>(1);
-          Get g = new Get(kv.getRow());
-          NavigableSet<byte []> qualifiers =
-            new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-          byte [] q = kv.getQualifier();
-          if(q == null) q = HConstants.EMPTY_BYTE_ARRAY;
-          qualifiers.add(q);
-          get(store, g, qualifiers, result);
-          if (result.isEmpty()) {
-            // Nothing to delete
-            continue;
-          }
-          if (result.size() > 1) {
-            throw new RuntimeException("Unexpected size: " + result.size());
+
+      //
+      // Now make changes to the memstore.
+      //
+      for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+
+        byte[] family = e.getKey(); 
+        List<KeyValue> kvs = e.getValue();
+        
+        Store store = getStore(family);
+        for (KeyValue kv: kvs) {
+          //  Check if time is LATEST, change to time of most recent addition if so
+          //  This is expensive.
+          if (kv.isLatestTimestamp() && kv.isDeleteType()) {
+            List<KeyValue> result = new ArrayList<KeyValue>(1);
+            Get g = new Get(kv.getRow());
+            NavigableSet<byte []> qualifiers =
+              new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+              byte [] q = kv.getQualifier();
+              if(q == null) q = HConstants.EMPTY_BYTE_ARRAY;
+              qualifiers.add(q);
+              get(store, g, qualifiers, result);
+              if (result.isEmpty()) {
+                // Nothing to delete
+                continue;
+              }
+              if (result.size() > 1) {
+                throw new RuntimeException("Unexpected size: " + result.size());
+              }
+              KeyValue getkv = result.get(0);
+              Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
+                  getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+          } else {
+            kv.updateLatestStamp(byteNow);
           }
-          KeyValue getkv = result.get(0);
-          Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
-            getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
-        } else {
-          kv.updateLatestStamp(byteNow);
+          size = this.memstoreSize.addAndGet(store.delete(kv));
         }
-
-        size = this.memstoreSize.addAndGet(store.delete(kv));
       }
       flush = isFlushSize(size);
     } finally {
@@ -1309,15 +1341,8 @@ public class HRegion implements HConstan
       Integer lid = getLock(lockid, row);
       byte [] now = Bytes.toBytes(System.currentTimeMillis());
       try {
-        for (Map.Entry<byte[], List<KeyValue>> entry:
-            put.getFamilyMap().entrySet()) {
-          byte [] family = entry.getKey();
-          checkFamily(family);
-          List<KeyValue> puts = entry.getValue();
-          if (updateKeys(puts, now)) {
-            put(family, puts, writeToWAL);
-          }
-        }
+        // All edits for the given row (across all column families) must happen atomically.
+        put(put.getFamilyMap(), writeToWAL);
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -1377,16 +1402,9 @@ public class HRegion implements HConstan
           matches = Bytes.equals(expectedValue, actualValue);
         }
         //If matches put the new put
-        if(matches) {
-          for(Map.Entry<byte[], List<KeyValue>> entry :
-            put.getFamilyMap().entrySet()) {
-            byte [] fam = entry.getKey();
-            checkFamily(fam);
-            List<KeyValue> puts = entry.getValue();
-            if(updateKeys(puts, now)) {
-              put(fam, puts, writeToWAL);
-            }
-          }
+        if (matches) {
+          // All edits for the given row (across all column families) must happen atomically.
+          put(put.getFamilyMap(), writeToWAL);
           return true;  
         }
         return false;
@@ -1496,34 +1514,76 @@ public class HRegion implements HConstan
    */
   private void put(final byte [] family, final List<KeyValue> edits)
   throws IOException {
-    this.put(family, edits, true);
+    Map<byte[], List<KeyValue>> familyMap = new HashMap<byte[], List<KeyValue>>();
+    familyMap.put(family, edits);
+    this.put(familyMap, true);
   }
 
   /** 
    * Add updates first to the hlog (if writeToWal) and then add values to memstore.
    * Warning: Assumption is caller has lock on passed in row.
-   * @param family
-   * @param edits
+   * @param familyMap map of family to edits for the given family.
    * @param writeToWAL if true, then we should write to the log
    * @throws IOException
    */
-  private void put(final byte [] family, final List<KeyValue> edits, 
+  private void put(final Map<byte [], List<KeyValue>> familyMap,
       boolean writeToWAL) throws IOException {
-    if (edits == null || edits.isEmpty()) {
-      return;
-    }
+    long now = System.currentTimeMillis();
+    byte[] byteNow = Bytes.toBytes(now);
     boolean flush = false;
     this.updatesLock.readLock().lock();
     try {
-      if (writeToWAL) {
-        long now = System.currentTimeMillis();
+
+      WALEdit walEdit = new WALEdit();
+      
+      // check if column families are valid;
+      // check if any timestampupdates are needed;
+      // and if writeToWAL is set, then also collapse edits into a single list.
+      for (Map.Entry<byte[], List<KeyValue>> e: familyMap.entrySet()) {
+        List<KeyValue> edits = e.getValue();
+        byte[] family = e.getKey();
+
+        // is this a valid column family?
+        checkFamily(family);
+
+        // update timestamp on keys if required.
+        if (updateKeys(edits, byteNow)) {
+          if (writeToWAL) {
+            // bunch up all edits across all column families into a 
+            // single WALEdit.
+            for (KeyValue kv : edits) {
+              walEdit.add(kv);
+            }
+          }
+        }
+      }
+
+      // append to and sync WAL
+      if (!walEdit.isEmpty()) {
+        //
+        // write/sync to WAL should happen before we touch memstore.
+        //
+        // If order is reversed, i.e. we write to memstore first, and
+        // for some reason fail to write/sync to commit log, the memstore
+        // will contain uncommitted transactions.
+        //
         this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), edits, now);
+            regionInfo.getTableDesc().getName(),
+            walEdit, now,
+            this.getRegionInfo().isMetaRegion());
       }
+      
       long size = 0;
-      Store store = getStore(family);
-      for (KeyValue kv: edits) {
-        size = this.memstoreSize.addAndGet(store.add(kv));
+
+      // now make changes to the memstore
+      for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+        byte[] family = e.getKey(); 
+        List<KeyValue> edits = e.getValue();
+
+        Store store = getStore(family);
+        for (KeyValue kv: edits) {
+          size = this.memstoreSize.addAndGet(store.add(kv));
+        }
       }
       flush = isFlushSize(size);
     } finally {
@@ -2485,10 +2545,11 @@ public class HRegion implements HConstan
       // now log it:
       if (writeToWAL) {
         long now = System.currentTimeMillis();
-        List<KeyValue> edits = new ArrayList<KeyValue>(1);
-        edits.add(newKv);
+        WALEdit walEdit = new WALEdit();
+        walEdit.add(newKv);
         this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), edits, now);
+          regionInfo.getTableDesc().getName(), walEdit, now,
+          this.getRegionInfo().isMetaRegion());
       }
 
       // Now request the ICV to the store, this will set the timestamp
@@ -2543,7 +2604,6 @@ public class HRegion implements HConstan
         (5 * Bytes.SIZEOF_BOOLEAN)) +
         (3 * ClassSize.REENTRANT_LOCK));
   
-  @Override
   public long heapSize() {
     long heapSize = DEEP_OVERHEAD;
     for(Store store : this.stores.values()) {

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Mar 18 23:51:49 2010
@@ -1763,7 +1763,6 @@ public class HRegionServer implements HC
       }
       region.put(put, getLockFromId(put.getLockId()));
       
-      this.hlog.sync(region.getRegionInfo().isMetaRegion());
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -1796,9 +1795,7 @@ public class HRegionServer implements HC
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
-    // All have been processed successfully.
-    this.hlog.sync(isMetaRegion);
-    
+
     if (i == puts.length) {
       return -1;
     } else {
@@ -1834,7 +1831,6 @@ public class HRegionServer implements HC
       boolean retval = region.checkAndPut(row, family, qualifier, value, put,
           getLockFromId(put.getLockId()), true);
 
-      this.hlog.sync(region.getRegionInfo().isMetaRegion());
       return retval;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
@@ -1996,7 +1992,6 @@ public class HRegionServer implements HC
       }
       Integer lid = getLockFromId(delete.getLockId());
       region.delete(delete, lid, writeToWAL);
-      this.hlog.sync(region.getRegionInfo().isMetaRegion());
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -2030,7 +2025,6 @@ public class HRegionServer implements HC
       throw convertThrowableToIOE(cleanup(t));
     }
     
-    this.hlog.sync(isMetaRegion);
     // All have been processed successfully.
     return -1;
   }
@@ -2489,7 +2483,6 @@ public class HRegionServer implements HC
       long retval = region.incrementColumnValue(row, family, qualifier, amount, 
           writeToWAL);
       
-      this.hlog.sync(region.getRegionInfo().isMetaRegion());
       return retval;
     } catch (IOException e) {
       checkFileSystem();

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Mar 18 23:51:49 2010
@@ -291,7 +291,7 @@ public class Store implements HConstants
    *
    * We can ignore any log message that has a sequence ID that's equal to or 
    * lower than maxSeqID.  (Because we know such log messages are already 
-   * reflected in the MapFiles.)
+   * reflected in the HFiles.)
    *
    * @return the new max sequence id as per the log, or -1 if no log recovered
    */
@@ -320,13 +320,22 @@ public class Store implements HConstants
       reconstructionLog, this.conf);
     try {
       HLogKey key = HLog.newKey(conf);
-      KeyValue val = new KeyValue();
+      WALEdit edits = new WALEdit();
       long skippedEdits = 0;
       long editsCount = 0;
       // How many edits to apply before we send a progress report.
       int reportInterval =
         this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
-      while (logReader.next(key, val)) {
+
+      // TBD: Need to add an exception handler around logReader.next.
+      //
+      // A transaction now appears as a single edit. If logReader.next()
+      // returns an exception, then it must be a incomplete/partial
+      // transaction at the end of the file. Rather than bubble up
+      // the exception, we should catch it and simply ignore the
+      // partial transaction during this recovery phase.
+      //
+      while (logReader.next(key, edits)) {
         if (firstSeqIdInLog == -1) {
           firstSeqIdInLog = key.getLogSeqNum();
         }
@@ -335,15 +344,19 @@ public class Store implements HConstants
           skippedEdits++;
           continue;
         }
-        // Check this edit is for me. Also, guard against writing the special
-        // METACOLUMN info such as HBASE::CACHEFLUSH entries
-        if (val.matchingFamily(HLog.METAFAMILY) ||
-          !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) ||
-          !val.matchingFamily(family.getName())) {
-          continue;
+        for (KeyValue kv : edits.getKeyValues())
+        {
+          // Check this edit is for me. Also, guard against writing the special
+          // METACOLUMN info such as HBASE::CACHEFLUSH entries
+          if (kv.matchingFamily(HLog.METAFAMILY) ||
+              !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) ||
+              !kv.matchingFamily(family.getName())) {
+              continue;
+            }
+          // Add anything as value as long as we use same instance each time.
+          reconstructedCache.add(kv);
         }
-        // Add anything as value as long as we use same instance each time.
-        reconstructedCache.add(val);
+
         editsCount++;
         // Every 2k edits, tell the reporter we're making progress.
         // Have seen 60k edits taking 3minutes to complete.
@@ -351,7 +364,7 @@ public class Store implements HConstants
           reporter.progress();
         }
         // Instantiate a new KeyValue to perform Writable on
-        val = new KeyValue();
+        edits = new WALEdit();
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
@@ -362,7 +375,7 @@ public class Store implements HConstants
     } finally {
       logReader.close();
     }
-    
+
     if (reconstructedCache.size() > 0) {
       // We create a "virtual flush" at maxSeqIdInLog+1.
       if (LOG.isDebugEnabled()) {

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Thu Mar 18 23:51:49 2010
@@ -102,12 +102,12 @@ public class TestHLog extends HBaseTestC
       for (int ii = 0; ii < howmany; ii++) {
         for (int i = 0; i < howmany; i++) {
           for (int j = 0; j < howmany; j++) {
-            List<KeyValue> edit = new ArrayList<KeyValue>();
+            WALEdit edits = new WALEdit();
             byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
-            edit.add(new KeyValue(rowName, column, System.currentTimeMillis(),
+            edits.add(new KeyValue(rowName, column, System.currentTimeMillis(),
               column));
-            System.out.println("Region " + i + ": " + edit);
-            log.append(Bytes.toBytes("" + i), tableName, edit, System.currentTimeMillis());
+            System.out.println("Region " + i + ": " + edits);
+            log.append(Bytes.toBytes("" + i), tableName, edits, System.currentTimeMillis(), false);
           }
         }
         log.rollWriter();
@@ -131,11 +131,11 @@ public class TestHLog extends HBaseTestC
         new SequenceFile.Reader(this.fs, splits.get(i), this.conf);
       try {
         HLogKey key = new HLogKey();
-        KeyValue kv = new KeyValue();
+        WALEdit kvs = new WALEdit();
         int count = 0;
         String previousRegion = null;
         long seqno = -1;
-        while(r.next(key, kv)) {
+        while(r.next(key, kvs)) {
           String region = Bytes.toString(key.getRegionName());
           // Assert that all edits are for same region.
           if (previousRegion != null) {
@@ -144,8 +144,10 @@ public class TestHLog extends HBaseTestC
           assertTrue(seqno < key.getLogSeqNum());
           seqno = key.getLogSeqNum();
           previousRegion = region;
-          System.out.println(key + " " + kv);
+          System.out.println(key + " " + kvs);
           count++;
+          key = new HLogKey();
+          kvs = new WALEdit();
         }
         assertEquals(howmany * howmany, count);
       } finally {
@@ -168,31 +170,39 @@ public class TestHLog extends HBaseTestC
       // Write columns named 1, 2, 3, etc. and then values of single byte
       // 1, 2, 3...
       long timestamp = System.currentTimeMillis();
-      List<KeyValue> cols = new ArrayList<KeyValue>();
+      WALEdit cols = new WALEdit();
       for (int i = 0; i < COL_COUNT; i++) {
         cols.add(new KeyValue(row, Bytes.toBytes("column:" + Integer.toString(i)),
           timestamp, new byte[] { (byte)(i + '0') }));
       }
-      log.append(regionName, tableName, cols, System.currentTimeMillis());
+      log.append(regionName, tableName, cols, System.currentTimeMillis(), false);
       long logSeqId = log.startCacheFlush();
-      log.completeCacheFlush(regionName, tableName, logSeqId);
+      log.completeCacheFlush(regionName, tableName, logSeqId, false);
       log.close();
       Path filename = log.computeFilename(log.getFilenum());
       log = null;
       // Now open a reader on the log and assert append worked.
       reader = new SequenceFile.Reader(fs, filename, conf);
       HLogKey key = new HLogKey();
-      KeyValue val = new KeyValue();
-      for (int i = 0; i < COL_COUNT; i++) {
-        reader.next(key, val);
+      WALEdit vals = new WALEdit();
+      reader.next(key, vals);
+      assertEquals(COL_COUNT, vals.size());
+      int idx = 0;
+      for (KeyValue val : vals.getKeyValues()) {
         assertTrue(Bytes.equals(regionName, key.getRegionName()));
         assertTrue(Bytes.equals(tableName, key.getTablename()));
         assertTrue(Bytes.equals(row, val.getRow()));
-        assertEquals((byte)(i + '0'), val.getValue()[0]);
+        assertEquals((byte)(idx + '0'), val.getValue()[0]);
         System.out.println(key + " " + val);
+        idx++;
       }
-      while (reader.next(key, val)) {
-        // Assert only one more row... the meta flushed row.
+
+      // Get next row... the meta flushed row.
+      key = new HLogKey();
+      vals = new WALEdit();
+      reader.next(key, vals);
+      assertEquals(1, vals.size());
+      for (KeyValue val : vals.getKeyValues()) {
         assertTrue(Bytes.equals(regionName, key.getRegionName()));
         assertTrue(Bytes.equals(tableName, key.getTablename()));
         assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=925047&r1=925046&r2=925047&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Mar 18 23:51:49 2010
@@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.TreeMap;
 import java.util.Arrays;
 
@@ -385,7 +387,9 @@ public class TestHRegion extends HBaseTe
     //testing existing family
     byte [] family = fam2;
     try {
-      region.delete(family, kvs, true);
+      Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
+      deleteMap.put(family, kvs);
+      region.delete(deleteMap, true);
     } catch (Exception e) {
       assertTrue("Family " +new String(family)+ " does not exist", false);
     }
@@ -394,7 +398,9 @@ public class TestHRegion extends HBaseTe
     boolean ok = false; 
     family = fam4;
     try {
-      region.delete(family, kvs, true);
+      Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
+      deleteMap.put(family, kvs);
+      region.delete(deleteMap, true);
     } catch (Exception e) {
       ok = true;
     }
@@ -595,7 +601,9 @@ public class TestHRegion extends HBaseTe
     kvs.add(new KeyValue(row1, fam1, col2, null));
     kvs.add(new KeyValue(row1, fam1, col3, null));
 
-    region.delete(fam1, kvs, true);
+    Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
+    deleteMap.put(fam1, kvs);
+    region.delete(deleteMap, true);
 
     // extract the key values out the memstore:
     // This is kinda hacky, but better than nothing...



Mime
View raw message