hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1177815 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/test/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/regionserve...
Date Fri, 30 Sep 2011 20:22:22 GMT
Author: tedyu
Date: Fri Sep 30 20:22:22 2011
New Revision: 1177815

URL: http://svn.apache.org/viewvc?rev=1177815&view=rev
Log:
HBASE-4487  The increment operation can release the rowlock before sync-ing
               the Hlog (dhruba borthakur)

Added:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogBench.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1177815&r1=1177814&r2=1177815&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Sep 30 20:22:22 2011
@@ -568,6 +568,8 @@ Release 0.92.0 - Unreleased
                (Chris Trezzo via JD)
    HBASE-2794  Utilize ROWCOL bloom filter if multiple columns within same family
                are requested in a Get (Mikhail Bautin)
+   HBASE-4487  The increment operation can release the rowlock before sync-ing
+               the Hlog (dhruba borthakur)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1177815&r1=1177814&r2=1177815&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Sep 30
20:22:22 2011
@@ -3526,6 +3526,7 @@ public class HRegion implements HeapSize
     List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
     long now = EnvironmentEdgeManager.currentTimeMillis();
     long size = 0;
+    long txid = 0;
 
     // Lock row
     startRegionOperation();
@@ -3584,7 +3585,7 @@ public class HRegion implements HeapSize
           // Using default cluster id, as this can only happen in the orginating
           // cluster. A slave cluster receives the final value (not the delta)
           // as a Put.
-          this.log.append(regionInfo, this.htableDescriptor.getName(),
+          txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
               walEdits, HConstants.DEFAULT_CLUSTER_ID, now,
               this.htableDescriptor);
         }
@@ -3595,6 +3596,9 @@ public class HRegion implements HeapSize
         this.updatesLock.readLock().unlock();
         releaseRowLock(lid);
       }
+      if (writeToWAL) {
+        this.log.sync(txid); // sync the transaction log outside the rowlock
+      }
     } finally {
       closeRegionOperation();
     }
@@ -3622,6 +3626,7 @@ public class HRegion implements HeapSize
     checkRow(row, "increment");
     boolean flush = false;
     boolean wrongLength = false;
+    long txid = 0;
     // Lock row
     long result = amount;
     startRegionOperation();
@@ -3665,7 +3670,7 @@ public class HRegion implements HeapSize
             // Using default cluster id, as this can only happen in the
             // orginating cluster. A slave cluster receives the final value (not
             // the delta) as a Put.
-            this.log.append(regionInfo, this.htableDescriptor.getName(),
+            txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
                 walEdit, HConstants.DEFAULT_CLUSTER_ID, now,
                 this.htableDescriptor);
           }
@@ -3682,6 +3687,9 @@ public class HRegion implements HeapSize
         this.updatesLock.readLock().unlock();
         releaseRowLock(lid);
       }
+      if (writeToWAL) {
+        this.log.sync(txid); // sync the transaction log outside the rowlock
+      }
     } finally {
       closeRegionOperation();
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1177815&r1=1177814&r2=1177815&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Sep 30
20:22:22 2011
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.SortedMap;
@@ -130,6 +131,8 @@ public class HLog implements Syncable {
   private final long optionalFlushInterval;
   private final long blocksize;
   private final String prefix;
+  private final AtomicLong unflushedEntries = new AtomicLong(0);
+  private volatile long syncedTillHere = 0;
   private final Path oldLogDir;
   private boolean logRollRunning;
 
@@ -256,8 +259,9 @@ public class HLog implements Syncable {
   private static volatile long writeOps;
   private static volatile long writeTime;
   // For measuring latency of syncs
-  private static volatile long syncOps;
-  private static volatile long syncTime;
+  private static AtomicLong syncOps = new AtomicLong();
+  private static AtomicLong syncTime = new AtomicLong();
+  private static AtomicLong syncBatchSize = new AtomicLong();
   
   public static long getWriteOps() {
     long ret = writeOps;
@@ -272,15 +276,15 @@ public class HLog implements Syncable {
   }
 
   public static long getSyncOps() {
-    long ret = syncOps;
-    syncOps = 0;
-    return ret;
+    return syncOps.getAndSet(0);
   }
 
   public static long getSyncTime() {
-    long ret = syncTime;
-    syncTime = 0;
-    return ret;
+    return syncTime.getAndSet(0);
+  }
+
+  public static long getSyncBatchSize() {
+    return syncBatchSize.getAndSet(0);
   }
 
   /**
@@ -795,6 +799,15 @@ public class HLog implements Syncable {
     if (this.writer != null) {
       // Close the current writer, get a new one.
       try {
+        // Wait till all current transactions are written to the hlog.
+        // No new transactions can occur because we have the updatelock.
+        if (this.unflushedEntries.get() != this.syncedTillHere) {
+          LOG.debug("cleanupCurrentWriter " +
+                   " waiting for transactions to get synced " +
+                   " total " + this.unflushedEntries.get() +
+                   " synced till here " + syncedTillHere);
+          sync();
+        }
         this.writer.close();
         closeErrorCount.set(0);
       } catch (IOException e) {
@@ -953,14 +966,17 @@ public class HLog implements Syncable {
    * @param regionInfo
    * @param logEdit
    * @param logKey
+   * @param doSync shall we sync after writing the transaction
+   * @return The txid of this transaction
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
-                     HTableDescriptor htd)
+  public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
+                     HTableDescriptor htd, boolean doSync)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
+    long txid = 0;
     synchronized (updateLock) {
       long seqNum = obtainSeqNum();
       logKey.setLogSeqNum(seqNum);
@@ -972,16 +988,19 @@ public class HLog implements Syncable {
       this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
         Long.valueOf(seqNum));
       doWrite(regionInfo, logKey, logEdit, htd);
+      txid = this.unflushedEntries.incrementAndGet();
       this.numEntries.incrementAndGet();
     }
 
     // Sync if catalog region, and if not then check if that table supports
     // deferred log flushing
-    if (regionInfo.isMetaRegion() ||
-        !htd.isDeferredLogFlush()) {
+    if (doSync &&
+        (regionInfo.isMetaRegion() ||
+        !htd.isDeferredLogFlush())) {
       // sync txn to file system
-      this.sync();
+      this.sync(txid);
     }
+    return txid;
   }
 
   /**
@@ -1022,15 +1041,18 @@ public class HLog implements Syncable {
    * @param edits
    * @param clusterId The originating clusterId for this edit (for replication)
    * @param now
+   * @param doSync shall we sync?
+   * @return txid of this transaction
    * @throws IOException
    */
-  public void append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
-      final long now, HTableDescriptor htd)
+  private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
+      final long now, HTableDescriptor htd, boolean doSync)
     throws IOException {
-      if (edits.isEmpty()) return;
+      if (edits.isEmpty()) return this.unflushedEntries.get();;
       if (this.closed) {
         throw new IOException("Cannot append; log is closed");
       }
+      long txid = 0;
       synchronized (this.updateLock) {
         long seqNum = obtainSeqNum();
         // The 'lastSeqWritten' map holds the sequence number of the oldest
@@ -1045,17 +1067,58 @@ public class HLog implements Syncable {
         HLogKey logKey = makeKey(hriKey, tableName, seqNum, now, clusterId);
         doWrite(info, logKey, edits, htd);
         this.numEntries.incrementAndGet();
+        txid = this.unflushedEntries.incrementAndGet();
       }
       // Sync if catalog region, and if not then check if that table supports
       // deferred log flushing
-      if (info.isMetaRegion() ||
-          !htd.isDeferredLogFlush()) {
+      if (doSync && 
+          (info.isMetaRegion() ||
+          !htd.isDeferredLogFlush())) {
         // sync txn to file system
-        this.sync();
+        this.sync(txid);
       }
+      return txid;
     }
 
   /**
+   * Append a set of edits to the log. Log edits are keyed by (encoded)
+   * regionName, rowname, and log-sequence-id. The HLog is not flushed
+   * after this transaction is written to the log.
+   *
+   * @param info
+   * @param tableName
+   * @param edits
+   * @param clusterId The originating clusterId for this edit (for replication)
+   * @param now
+   * @return txid of this transaction
+   * @throws IOException
+   */
+  public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, 
+    UUID clusterId, final long now, HTableDescriptor htd)
+    throws IOException {
+    return append(info, tableName, edits, clusterId, now, htd, false);
+  }
+
+  /**
+   * Append a set of edits to the log. Log edits are keyed by (encoded)
+   * regionName, rowname, and log-sequence-id. The HLog is flushed
+   * after this transaction is written to the log.
+   *
+   * @param info
+   * @param tableName
+   * @param edits
+   * @param clusterId The originating clusterId for this edit (for replication)
+   * @param now
+   * @return txid of this transaction
+   * @throws IOException
+   */
+  public long append(HRegionInfo info, byte [] tableName, WALEdit edits, 
+    UUID clusterId, final long now, HTableDescriptor htd)
+    throws IOException {
+    return append(info, tableName, edits, clusterId, now, htd, true);
+  }
+
+  /**
    * This thread is responsible to call syncFs and buffer up the writers while
    * it happens.
    */
@@ -1063,6 +1126,14 @@ public class HLog implements Syncable {
 
     private final long optionalFlushInterval;
 
+    // List of pending writes to the HLog. There corresponds to transactions
+    // that have not yet returned to the client. We keep them cached here
+    // instead of writing them to HDFS piecemeal, because the HDFS write 
+    // method is pretty heavyweight as far as locking is concerned. The 
+    // goal is to increase the batchsize for writing-to-hdfs as well as
+    // sync-to-hdfs, so that we can get better system throughput.
+    private List<Entry> pendingWrites = new LinkedList<Entry>();
+
     LogSyncer(long optionalFlushInterval) {
       this.optionalFlushInterval = optionalFlushInterval;
     }
@@ -1075,7 +1146,9 @@ public class HLog implements Syncable {
         while(!this.isInterrupted()) {
 
           try {
-            Thread.sleep(this.optionalFlushInterval);
+            if (unflushedEntries.get() <= syncedTillHere) {
+              Thread.sleep(this.optionalFlushInterval);
+            }
             sync();
           } catch (IOException e) {
             LOG.error("Error while syncing, requesting close of hlog ", e);
@@ -1088,38 +1161,85 @@ public class HLog implements Syncable {
         LOG.info(getName() + " exiting");
       }
     }
+
+    // appends new writes to the pendingWrites. It is better to keep it in
+    // our own queue rather than writing it to the HDFS output stream because
+    // HDFSOutputStream.writeChunk is not lightweight at all.
+    synchronized void append(Entry e) throws IOException {
+      pendingWrites.add(e);
+    }
+
+    // Returns all currently pending writes. New writes
+    // will accumulate in a new list.
+    synchronized List<Entry> getPendingWrites() {
+      List<Entry> save = this.pendingWrites;
+      this.pendingWrites = new LinkedList<Entry>();
+      return save;
+    }
+
+    // writes out pending entries to the HLog
+    void hlogFlush(Writer writer) throws IOException {
+      // Atomically fetch all existing pending writes. New writes
+      // will start accumulating in a new list.
+      List<Entry> pending = getPendingWrites();
+
+      // write out all accumulated Entries to hdfs.
+      for (Entry e : pending) {
+        writer.append(e);
+      }
+    }
   }
 
+  // sync all known transactions
   private void syncer() throws IOException {
+    syncer(this.unflushedEntries.get()); // sync all pending items
+  }
+
+  // sync all transactions upto the specified txid
+  private void syncer(long txid) throws IOException {
     synchronized (this.updateLock) {
-      if (this.closed) {
-        return;
-      }
+      if (this.closed) return;
+    }
+    // if the transaction that we are interested in is already 
+    // synced, then return immediately.
+    if (txid <= this.syncedTillHere) {
+      return;
     }
     try {
+      long doneUpto = this.unflushedEntries.get();
       long now = System.currentTimeMillis();
       // Done in parallel for all writer threads, thanks to HDFS-895
       boolean syncSuccessful = true;
       try {
+        // First flush all the pending writes to HDFS. Then 
+        // issue the sync to HDFS. If sync is successful, then update
+        // syncedTillHere to indicate that transactions till this
+        // number has been successfully synced.
+        logSyncerThread.hlogFlush(this.writer);
         this.writer.sync();
+        syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
+        this.syncedTillHere = doneUpto;
       } catch(IOException io) {
         syncSuccessful = false;
       }
-      synchronized (this.updateLock) {
-        if (!syncSuccessful) {
+      if (!syncSuccessful) {
+        synchronized (this.updateLock) {
           // HBASE-4387, retry with updateLock held
           this.writer.sync();
+          syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
+          this.syncedTillHere = doneUpto;
         }
-        syncTime += System.currentTimeMillis() - now;
-        syncOps++;
-        if (!this.logRollRunning) {
-          checkLowReplication();
-          if (this.writer.getLength() > this.logrollsize) {
-            requestLogRoll();
-          }
+      }
+      // We try to not acquire the updateLock just to update statistics.
+      // Make these statistics as AtomicLong.
+      syncTime.addAndGet(System.currentTimeMillis() - now);
+      syncOps.incrementAndGet();
+      if (!this.logRollRunning) {
+        checkLowReplication();
+        if (this.writer.getLength() > this.logrollsize) {
+          requestLogRoll();
         }
       }
-
     } catch (IOException e) {
       LOG.fatal("Could not sync. Requesting close of hlog", e);
       requestLogRoll();
@@ -1212,6 +1332,10 @@ public class HLog implements Syncable {
     syncer();
   }
 
+  public void sync(long txid) throws IOException {
+    syncer(txid);
+  }
+
   private void requestLogRoll() {
     if (!this.listeners.isEmpty()) {
       for (WALActionsListener i: this.listeners) {
@@ -1235,8 +1359,8 @@ public class HLog implements Syncable {
       long now = System.currentTimeMillis();
       // coprocessor hook:
       if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
-        // if not bypassed:
-        this.writer.append(new HLog.Entry(logKey, logEdit));
+        // write to our buffer for the Hlog file.
+        logSyncerThread.append(new HLog.Entry(logKey, logEdit));
       }
       long took = System.currentTimeMillis() - now;
       coprocessorHost.postWALWrite(info, logKey, logEdit);
@@ -1357,18 +1481,20 @@ public class HLog implements Syncable {
       if (this.closed) {
         return;
       }
+      long txid = 0;
       synchronized (updateLock) {
         long now = System.currentTimeMillis();
         WALEdit edit = completeCacheFlushLogEdit();
         HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
             System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
-        this.writer.append(new Entry(key, edit));
+        logSyncerThread.append(new Entry(key, edit));
+        txid = this.unflushedEntries.incrementAndGet();
         writeTime += System.currentTimeMillis() - now;
         writeOps++;
         this.numEntries.incrementAndGet();
       }
       // sync txn to file system
-      this.sync();
+      this.sync(txid);
 
     } finally {
       // updateLock not needed for removing snapshot's entry

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java?rev=1177815&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java Fri
Sep 30 20:22:22 2011
@@ -0,0 +1,265 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.NullComparator;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Testing of HRegion.incrementColumnValue
+ *
+ */
+public class TestIncrement extends HBaseTestCase {
+  static final Log LOG = LogFactory.getLog(TestIncrement.class);
+
+  HRegion region = null;
+  private final String DIR = HBaseTestingUtility.getTestDir() +
+    "/TestIncrement/";
+
+  private final int MAX_VERSIONS = 2;
+
+  // Test names
+  static final byte[] tableName = Bytes.toBytes("testtable");;
+  static final byte[] qual1 = Bytes.toBytes("qual1");
+  static final byte[] qual2 = Bytes.toBytes("qual2");
+  static final byte[] qual3 = Bytes.toBytes("qual3");
+  static final byte[] value1 = Bytes.toBytes("value1");
+  static final byte[] value2 = Bytes.toBytes("value2");
+  static final byte [] row = Bytes.toBytes("rowA");
+  static final byte [] row2 = Bytes.toBytes("rowB");
+
+  /**
+   * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    EnvironmentEdgeManagerTestHelper.reset();
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // New tests that doesn't spin up a mini cluster but rather just test the
+  // individual code pieces in the HRegion. 
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Test one increment command.
+   */
+  public void testIncrementColumnValue() throws IOException {
+    LOG.info("Starting test testIncrementColumnValue");
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 1L;
+    long amount = 3L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    region.put(put);
+
+    long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
+
+    assertEquals(value+amount, result);
+
+    Store store = region.getStore(fam1);
+    // ICV removes any extra values floating around in there.
+    assertEquals(1, store.memstore.kvset.size());
+    assertTrue(store.memstore.snapshot.isEmpty());
+
+    assertICV(row, fam1, qual1, value+amount);
+  }
+
+  /**
+   * Test multi-threaded increments.
+   */
+  public void testIncrementMultiThreads() throws IOException {
+
+    LOG.info("Starting test testIncrementMultiThreads");
+    initHRegion(tableName, getName(), fam1);
+
+    // create 100 threads, each will increment by its own quantity
+    int numThreads = 100;
+    int incrementsPerThread = 1000;
+    Incrementer[] all = new Incrementer[numThreads];
+    int expectedTotal = 0;
+
+    // create all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i] = new Incrementer(region, i, i, incrementsPerThread);
+      expectedTotal += (i * incrementsPerThread);
+    }
+
+    // run all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i].start();
+    }
+
+    // wait for all threads to finish
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        all[i].join();
+      } catch (InterruptedException e) {
+      }
+    }
+    assertICV(row, fam1, qual1, expectedTotal);
+    LOG.info("testIncrementMultiThreads successfully verified that total is " +
+             expectedTotal);
+  }
+
+
+  private void assertICV(byte [] row,
+                         byte [] familiy,
+                         byte[] qualifier,
+                         long amount) throws IOException {
+    // run a get and see?
+    Get get = new Get(row);
+    get.addColumn(familiy, qualifier);
+    Result result = region.get(get, null);
+    assertEquals(1, result.size());
+
+    KeyValue kv = result.raw()[0];
+    long r = Bytes.toLong(kv.getValue());
+    assertEquals(amount, r);
+  }
+
+  private void initHRegion (byte [] tableName, String callingMethod,
+    byte[] ... families)
+  throws IOException {
+    initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
+  }
+
+  private void initHRegion (byte [] tableName, String callingMethod,
+    Configuration conf, byte [] ... families)
+  throws IOException{
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for(byte [] family : families) {
+      htd.addFamily(new HColumnDescriptor(family));
+    }
+    HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
+    Path path = new Path(DIR + callingMethod);
+    if (fs.exists(path)) {
+      if (!fs.delete(path, true)) {
+        throw new IOException("Failed delete of " + path);
+      }
+    }
+    region = HRegion.createHRegion(info, path, conf, htd);
+  }
+
+  /**
+   * A thread that makes a few increment calls
+   */
+  public static class Incrementer extends Thread {
+
+    private final HRegion region;
+    private final int threadNumber;
+    private final int numIncrements;
+    private final int amount;
+
+    private int count;
+
+    public Incrementer(HRegion region, 
+        int threadNumber, int amount, int numIncrements) {
+      this.region = region;
+      this.threadNumber = threadNumber;
+      this.numIncrements = numIncrements;
+      this.count = 0;
+      this.amount = amount;
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      for (int i=0; i<numIncrements; i++) {
+        try {
+          long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
+          // LOG.info("thread:" + threadNumber + " iter:" + i);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+        count++;
+      }
+    }
+  }
+
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogBench.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogBench.java?rev=1177815&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogBench.java
(added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogBench.java
Fri Sep 30 20:22:22 2011
@@ -0,0 +1,349 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Random;
+import java.text.NumberFormat;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHLogBench extends Configured implements Tool {
+
+  static final Log LOG = LogFactory.getLog(TestHLogBench.class);
+  private static final Random r = new Random();
+
+  private static final byte [] FAMILY = Bytes.toBytes("hlogbenchFamily");
+
+  // accumulate time here
+  private static int totalTime = 0;
+  private static Object lock = new Object();
+
+  // the file system where to create the Hlog file
+  protected FileSystem fs;
+
+  // the number of threads and the number of iterations per thread
+  private int numThreads = 300;
+  private int numIterationsPerThread = 10000;
+  private Path regionRootDir = new Path(HBaseTestingUtility.getTestDir() + 
+                                        "/TestHLogBench/");
+  private boolean appendNoSync = false;
+
+  public TestHLogBench() {
+    this(null);
+  }
+
+  private TestHLogBench(Configuration conf) {
+    super(conf);
+    fs = null;
+  }
+
+  /**
+   * Initialize file system object
+   */
+  public void init() throws IOException {
+    getConf().setQuietMode(true);
+    if (this.fs == null) {
+     this.fs = FileSystem.get(getConf());
+    }
+  }
+
+  /**
+   * Close down file system
+   */
+  public void close() throws IOException {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+  }
+
+  /**
+   * The main run method of TestHLogBench
+   */
+  public int run(String argv[]) throws Exception {
+
+    int exitCode = -1;
+    int i = 0;
+
+    // verify that we have enough command line parameters
+    if (argv.length < 4) {
+      printUsage("");
+      return exitCode;
+    }
+
+    // initialize LogBench
+    try {
+      init();
+    } catch (HBaseRPC.VersionMismatch v) {
+      LOG.warn("Version Mismatch between client and server" +
+               "... command aborted.");
+      return exitCode;
+    } catch (IOException e) {
+      LOG.warn("Bad connection to FS. command aborted.");
+      return exitCode;
+    }
+
+    try {
+      for (; i < argv.length; i++) {
+        if ("-numThreads".equals(argv[i])) {
+          i++;
+          this.numThreads = Integer.parseInt(argv[i]);
+        } else if ("-numIterationsPerThread".equals(argv[i])) {
+          i++;
+          this.numIterationsPerThread = Integer.parseInt(argv[i]);
+        } else if ("-path".equals(argv[i])) {
+          // get an absolute path using the default file system
+          i++;
+          this.regionRootDir = new Path(argv[i]);
+          this.regionRootDir = regionRootDir.makeQualified(this.fs);
+        } else if ("-nosync".equals(argv[i])) {
+          this.appendNoSync = true;
+        } else {
+          printUsage(argv[i]);
+          return exitCode;
+        }
+      }
+    } catch (NumberFormatException nfe) {
+      LOG.warn("Illegal numThreads or numIterationsPerThread, " +
+               " a positive integer expected");
+      throw nfe;
+    }
+    go();
+    return 0;
+  }
+
+  private void go() throws IOException, InterruptedException {
+
+    long start = System.currentTimeMillis();
+    log("Running TestHLogBench with " + numThreads + " threads each doing " +
+        numIterationsPerThread + " HLog appends " +
+        (appendNoSync ? "nosync" : "sync") +
+        " at rootDir " + regionRootDir);
+
+    // Mock an HRegion
+    byte [] tableName = Bytes.toBytes("table");
+    byte [][] familyNames = new byte [][] { FAMILY };
+    HTableDescriptor htd = new HTableDescriptor();
+    htd.addFamily(new HColumnDescriptor(Bytes.toBytes("f1")));
+    HRegion region = mockRegion(tableName, familyNames, regionRootDir);
+    HLog hlog = region.getLog();
+
+    // Spin up N threads to each perform M log operations
+    LogWriter [] incrementors = new LogWriter[numThreads];
+    for (int i=0; i<numThreads; i++) {
+      incrementors[i] = new LogWriter(region, tableName, hlog, i, 
+                                      numIterationsPerThread, 
+                                      appendNoSync);
+      incrementors[i].start();
+    }
+
+    // Wait for threads to finish
+    for (int i=0; i<numThreads; i++) {
+      //log("Waiting for #" + i + " to finish");
+      incrementors[i].join();
+    }
+
+    // Output statistics
+    long totalOps = numThreads * numIterationsPerThread;
+    log("Operations per second " + ((totalOps * 1000L)/totalTime));
+    log("Average latency in ms " + ((totalTime * 1000L)/totalOps));
+  }
+
+  /**
+   * Displays format of commands.
+   */
+  private static void printUsage(String cmd) {
+    String prefix = "Usage: java " + TestHLogBench.class.getSimpleName();
+    System.err.println(prefix + cmd + 
+                       " [-numThreads <number>] " +
+                       " [-numIterationsPerThread <number>] " +
+                       " [-path <path where region's root directory is created>]" +
+                       " [-nosync]");
+  }
+
+  /**
+   * A thread that writes data to an HLog
+   */
+  public static class LogWriter extends Thread {
+
+    private final HRegion region;
+    private final int threadNumber;
+    private final int numIncrements;
+    private final HLog hlog;
+    private boolean appendNoSync;
+    private byte[] tableName;
+
+    private int count;
+
+    public LogWriter(HRegion region, byte[] tableName,
+        HLog log, int threadNumber,
+        int numIncrements, boolean appendNoSync) {
+      this.region = region;
+      this.threadNumber = threadNumber;
+      this.numIncrements = numIncrements;
+      this.hlog = log;
+      this.count = 0;
+      this.appendNoSync = appendNoSync;
+      this.tableName = tableName;
+      setDaemon(true);
+      //log("LogWriter[" + threadNumber + "] instantiated");
+    }
+
+    @Override
+    public void run() {
+      long now = System.currentTimeMillis();
+      byte [] key = Bytes.toBytes("thisisakey");
+      KeyValue kv = new KeyValue(key, now);
+      WALEdit walEdit = new WALEdit();
+      walEdit.add(kv);
+      HRegionInfo hri = region.getRegionInfo();
+      HTableDescriptor htd = new HTableDescriptor();
+      htd.addFamily(new HColumnDescriptor(Bytes.toBytes("f1")));
+      boolean isMetaRegion = false;
+      long start = System.currentTimeMillis();
+      for (int i=0; i<numIncrements; i++) {
+        try {
+          if (appendNoSync) {
+            hlog.appendNoSync(hri, tableName, walEdit, 
+                              HConstants.DEFAULT_CLUSTER_ID, now, htd);
+          } else {
+            hlog.append(hri, tableName, walEdit, now, htd);
+          }
+        } catch (IOException e) {
+          log("Fatal exception: " + e);
+          e.printStackTrace();
+        }
+        count++;
+      }
+      long tot = System.currentTimeMillis() - start;
+      synchronized (lock) {
+        totalTime += tot;   // update global statistics
+      }
+
+    }
+  }
+
+  private static void log(String string) {
+    LOG.info(string);
+  }
+
+  private byte[][] makeBytes(int numArrays, int arraySize) {
+    byte [][] bytes = new byte[numArrays][];
+    for (int i=0; i<numArrays; i++) {
+      bytes[i] = new byte[arraySize];
+      r.nextBytes(bytes[i]);
+    }
+    return bytes;
+  }
+
+  /**
+   * Create a dummy region
+   */
+  private HRegion mockRegion(byte[] tableName, byte[][] familyNames,
+                             Path rootDir) throws IOException {
+
+    HBaseTestingUtility htu = new HBaseTestingUtility();
+    Configuration conf = htu.getConfiguration();
+    conf.setBoolean("hbase.rs.cacheblocksonwrite", true);
+    conf.setBoolean("hbase.hregion.use.incrementnew", true);
+    conf.setBoolean("dfs.support.append", true);
+    FileSystem fs = FileSystem.get(conf);
+    int numQualifiers = 10;
+    byte [][] qualifiers = new byte [numQualifiers][];
+    for (int i=0; i<numQualifiers; i++) qualifiers[i] = Bytes.toBytes("qf" + i);
+    int numRows = 10;
+    byte [][] rows = new byte [numRows][];
+    for (int i=0; i<numRows; i++) rows[i] = Bytes.toBytes("r" + i);
+
+    // switch off debug message from Region server
+    ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.WARN);
+
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for (byte [] family : familyNames)
+      htd.addFamily(new HColumnDescriptor(family));
+
+    HRegionInfo hri = new HRegionInfo(tableName, Bytes.toBytes(0L), 
+                                      Bytes.toBytes(0xffffffffL));
+    if (fs.exists(rootDir)) {
+      if (!fs.delete(rootDir, true)) {
+        throw new IOException("Failed delete of " + rootDir);
+      }
+    }
+    return HRegion.createHRegion(hri, rootDir, conf, htd);
+  }
+
+  @Test
+  public void testLogPerformance() throws Exception {
+    TestHLogBench bench = new TestHLogBench();
+    int res;
+    String[] argv = new String[7];
+    argv[0] = "-numThreads";
+    argv[1] = Integer.toString(100);
+    argv[2] = "-numIterationsPerThread";
+    argv[3] = Integer.toString(1000);
+    argv[4] = "-path";
+    argv[5] = HBaseTestingUtility.getTestDir() + "/HlogPerformance";
+    argv[6] = "-nosync";
+    try {
+      res = ToolRunner.run(bench, argv);
+    } finally {
+      bench.close();
+    }
+  }
+
+  public static void main(String[] argv) throws Exception {
+    TestHLogBench bench = new TestHLogBench();
+    int res;
+    try {
+      res = ToolRunner.run(bench, argv);
+    } finally {
+      bench.close();
+    }
+    System.exit(res);
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java?rev=1177815&r1=1177814&r2=1177815&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
Fri Sep 30 20:22:22 2011
@@ -96,7 +96,7 @@ public class TestWALActionsListener {
       htd.addFamily(new HColumnDescriptor(b));
 
       HLogKey key = new HLogKey(b,b, 0, 0, HConstants.DEFAULT_CLUSTER_ID);
-      hlog.append(hri, key, edit, htd);
+      hlog.append(hri, key, edit, htd, true);
       if (i == 10) {
         hlog.registerWALActionsListener(laterobserver);
       }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1177815&r1=1177814&r2=1177815&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Fri Sep 30 20:22:22 2011
@@ -174,7 +174,7 @@ public class TestReplicationSourceManage
       LOG.info(i);
       HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
           System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
-      hlog.append(hri, key, edit, htd);
+      hlog.append(hri, key, edit, htd, true);
     }
 
     // Simulate a rapid insert that's followed
@@ -187,7 +187,7 @@ public class TestReplicationSourceManage
     for (int i = 0; i < 3; i++) {
       HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
           System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
-      hlog.append(hri, key, edit, htd);
+      hlog.append(hri, key, edit, htd, true);
     }
 
     assertEquals(6, manager.getHLogs().get(slaveId).size());
@@ -199,7 +199,7 @@ public class TestReplicationSourceManage
 
     HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
         System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
-    hlog.append(hri, key, edit, htd);
+    hlog.append(hri, key, edit, htd, true);
 
     assertEquals(1, manager.getHLogs().size());
 



Mime
View raw message