hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1162295 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ src/test/java/org/apache/hadoop/hbase/regionserver/
Date Sat, 27 Aug 2011 04:26:03 GMT
Author: stack
Date: Sat Aug 27 04:26:03 2011
New Revision: 1162295

URL: http://svn.apache.org/viewvc?rev=1162295&view=rev
Log:
HBASE-3900 Expose progress of a major compaction in UI and/or in shell

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerLoad.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1162295&r1=1162294&r2=1162295&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Sat Aug 27 04:26:03 2011
@@ -410,6 +410,8 @@ Release 0.91.0 - Unreleased
    HBASE-4241  Optimize flushing of the Memstore (Lars Hofhansl)
    HBASE-4248  Enhancements for Filter Language exposing HBase filters through
                the Thrift API (Anirudh Todi)
+   HBASE-3900  Expose progress of a major compaction in UI and/or in shell
+               (Brad Anderson)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerLoad.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerLoad.java?rev=1162295&r1=1162294&r2=1162295&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerLoad.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HServerLoad.java Sat Aug 27 04:26:03
2011
@@ -97,6 +97,10 @@ implements WritableComparable<HServerLoa
     private int readRequestsCount;
     /** the current total write requests made to region */
     private int writeRequestsCount;
+    /** the total compacting key values in currently running compaction */
+    private long totalCompactingKVs;
+    /** the completed count of key values in currently running compaction */
+    private long currentCompactedKVs;
 
     /** The current total size of root-level indexes for the region, in KB. */
     private int rootIndexSizeKB;
@@ -121,11 +125,14 @@ implements WritableComparable<HServerLoa
      * @param name
      * @param stores
      * @param storefiles
+     * @param storeUncompressedSizeMB
      * @param storefileSizeMB
      * @param memstoreSizeMB
      * @param storefileIndexSizeMB
      * @param readRequestsCount
      * @param writeRequestsCount
+     * @param totalCompactingKVs
+     * @param currentCompactedKVs
      */
     public RegionLoad(final byte[] name, final int stores,
         final int storefiles, final int storeUncompressedSizeMB,
@@ -133,7 +140,8 @@ implements WritableComparable<HServerLoa
         final int memstoreSizeMB, final int storefileIndexSizeMB,
         final int rootIndexSizeKB, final int totalStaticIndexSizeKB,
         final int totalStaticBloomSizeKB,
-        final int readRequestsCount, final int writeRequestsCount) {
+        final int readRequestsCount, final int writeRequestsCount,
+        final long totalCompactingKVs, final long currentCompactedKVs) {
       this.name = name;
       this.stores = stores;
       this.storefiles = storefiles;
@@ -146,6 +154,8 @@ implements WritableComparable<HServerLoa
       this.totalStaticBloomSizeKB = totalStaticBloomSizeKB;
       this.readRequestsCount = readRequestsCount;
       this.writeRequestsCount = writeRequestsCount;
+      this.totalCompactingKVs = totalCompactingKVs;
+      this.currentCompactedKVs = currentCompactedKVs;
     }
 
     // Getters
@@ -198,7 +208,7 @@ implements WritableComparable<HServerLoa
     public int getStorefileIndexSizeMB() {
       return storefileIndexSizeMB;
     }
-    
+
     /**
      * @return the number of requests made to region
      */
@@ -220,6 +230,20 @@ implements WritableComparable<HServerLoa
       return writeRequestsCount;
     }
 
+    /**
+     * @return the total number of kvs in current compaction
+     */
+    public long getTotalCompactingKVs() {
+      return totalCompactingKVs;
+    }
+
+    /**
+     * @return the number of already compacted kvs in current compaction
+     */
+    public long getCurrentCompactedKVs() {
+      return currentCompactedKVs;
+    }
+
     // Setters
 
     /**
@@ -272,6 +296,21 @@ implements WritableComparable<HServerLoa
       this.writeRequestsCount = requestsCount;
     }
 
+    /**
+     * @param totalCompactingKVs the number of kvs total in current compaction
+     */
+    public void setTotalCompactingKVs(int totalCompactingKVs) {
+      this.totalCompactingKVs = totalCompactingKVs;
+    }
+
+    /**
+     * @param currentCompactedKVs the number of kvs already compacted in
+     * current compaction
+     */
+    public void setCurrentCompactedKVs(int currentCompactedKVs) {
+      this.currentCompactedKVs = currentCompactedKVs;
+    }
+
     // Writable
     public void readFields(DataInput in) throws IOException {
       super.readFields(in);
@@ -291,6 +330,8 @@ implements WritableComparable<HServerLoa
       this.rootIndexSizeKB = in.readInt();
       this.totalStaticIndexSizeKB = in.readInt();
       this.totalStaticBloomSizeKB = in.readInt();
+      this.totalCompactingKVs = in.readInt();
+      this.currentCompactedKVs = in.readInt();
     }
 
     public void write(DataOutput out) throws IOException {
@@ -309,6 +350,8 @@ implements WritableComparable<HServerLoa
       out.writeInt(rootIndexSizeKB);
       out.writeInt(totalStaticIndexSizeKB);
       out.writeInt(totalStaticBloomSizeKB);
+      out.writeLong(totalCompactingKVs);
+      out.writeLong(currentCompactedKVs);
     }
 
     /**
@@ -327,7 +370,7 @@ implements WritableComparable<HServerLoa
       if (this.storeUncompressedSizeMB != 0) {
         sb = Strings.appendKeyValue(sb, "compressionRatio",
             String.format("%.4f", (float)this.storefileSizeMB/
-                (float)this.storeUncompressedSizeMB));        
+                (float)this.storeUncompressedSizeMB));
       }
       sb = Strings.appendKeyValue(sb, "memstoreSizeMB",
         Integer.valueOf(this.memstoreSizeMB));
@@ -343,6 +386,17 @@ implements WritableComparable<HServerLoa
           Integer.valueOf(this.totalStaticIndexSizeKB));
       sb = Strings.appendKeyValue(sb, "totalStaticBloomSizeKB",
         Integer.valueOf(this.totalStaticBloomSizeKB));
+      sb = Strings.appendKeyValue(sb, "totalCompactingKVs",
+          Long.valueOf(this.totalCompactingKVs));
+      sb = Strings.appendKeyValue(sb, "currentCompactedKVs",
+          Long.valueOf(this.currentCompactedKVs));
+      float compactionProgressPct = Float.NaN;
+      if( this.totalCompactingKVs > 0 ) {
+        compactionProgressPct = Float.valueOf(
+            this.currentCompactedKVs / this.totalCompactingKVs);
+      }
+      sb = Strings.appendKeyValue(sb, "compactionProgressPct",
+          compactionProgressPct);
       return sb.toString();
     }
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1162295&r1=1162294&r2=1162295&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat
Aug 27 04:26:03 2011
@@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.ipc.Invoc
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
@@ -631,9 +632,7 @@ public class HRegionServer implements HR
             closeUserRegions(this.abortRequested);
           } else if (this.stopping) {
             LOG.info("Stopping meta regions, if the HRegionServer hosts any");
-
             boolean allUserRegionsOffline = areAllUserRegionsOffline();
-
             if (allUserRegionsOffline) {
               // Set stopped if no requests since last time we went around the loop.
               // The remaining meta regions will be closed on our way out.
@@ -926,6 +925,8 @@ public class HRegionServer implements HR
     int rootIndexSizeKB = 0;
     int totalStaticIndexSizeKB = 0;
     int totalStaticBloomSizeKB = 0;
+    long totalCompactingKVs = 0;
+    long currentCompactedKVs = 0;
     synchronized (r.stores) {
       stores += r.stores.size();
       for (Store store : r.stores.values()) {
@@ -934,6 +935,11 @@ public class HRegionServer implements HR
             / 1024 / 1024);
         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
+        CompactionProgress progress = store.getCompactionProgress();
+        if (progress != null) {
+          totalCompactingKVs += progress.totalCompactingKVs;
+          currentCompactedKVs += progress.currentCompactedKVs;
+        }
 
         rootIndexSizeKB +=
             (int) (store.getStorefilesIndexSize() / 1024);
@@ -949,7 +955,8 @@ public class HRegionServer implements HR
         storeUncompressedSizeMB,
         storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
         totalStaticIndexSizeKB, totalStaticBloomSizeKB,
-        (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get());
+        (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
+        totalCompactingKVs, currentCompactedKVs);
   }
 
   /**
@@ -2531,6 +2538,8 @@ public class HRegionServer implements HR
     int storefileSizeMB = 0;
     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
     int storefileIndexSizeMB = 0;
+    long totalCompactingKVs = 0;
+    long currentCompactedKVs = 0;
     synchronized (r.stores) {
       stores += r.stores.size();
       for (Store store : r.stores.values()) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1162295&r1=1162294&r2=1162295&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Aug 27 04:26:03
2011
@@ -37,14 +37,20 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -112,6 +118,7 @@ public class Store implements HeapSize {
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final String storeNameStr;
   private final boolean inMemory;
+  private CompactionProgress progress;
 
   /*
    * List of store files inside this store. This is an immutable list that
@@ -766,7 +773,7 @@ public class Store implements HeapSize {
    * @param dir
    * @throws IOException
    */
-  public static long getLowestTimestamp(final List<StoreFile> candidates) 
+  public static long getLowestTimestamp(final List<StoreFile> candidates)
       throws IOException {
     long minTs = Long.MAX_VALUE;
     for (StoreFile storeFile : candidates) {
@@ -775,6 +782,13 @@ public class Store implements HeapSize {
     return minTs;
   }
 
+  /** getter for CompactionProgress object
+   * @return CompactionProgress object
+   */
+  public CompactionProgress getCompactionProgress() {
+    return this.progress;
+  }
+
   /*
    * @return True if we should run a major compaction.
    */
@@ -830,7 +844,7 @@ public class Store implements HeapSize {
           }
         } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
           LOG.debug("Major compaction triggered on store " + this.storeNameStr +
-            ", because keyvalues outdated; time since last major compaction " + 
+            ", because keyvalues outdated; time since last major compaction " +
             (now - lowTimestamp) + "ms");
           result = true;
         }
@@ -1090,6 +1104,9 @@ public class Store implements HeapSize {
       }
     }
 
+    // keep track of compaction progress
+    progress = new CompactionProgress(maxKeyCount);
+
     // For each file, obtain a scanner:
     List<StoreFileScanner> scanners = StoreFileScanner
       .getScannersForStoreFiles(filesToCompact, false, false);
@@ -1117,6 +1134,8 @@ public class Store implements HeapSize {
             // output to writer:
             for (KeyValue kv : kvs) {
               writer.append(kv);
+              // update progress per key
+              ++progress.currentCompactedKVs;
 
               // check periodically to see if a system stop is requested
               if (Store.closeCheckInterval > 0) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1162295&r1=1162294&r2=1162295&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Sat Aug 27 04:26:03 2011
@@ -77,7 +77,7 @@ public class CompactionRequest implement
     /**
      * This function will define where in the priority queue the request will
      * end up.  Those with the highest priorities will be first.  When the
-     * priorities are the same it will It will first compare priority then date
+     * priorities are the same it will first compare priority then date
      * to maintain a FIFO functionality.
      *
      * <p>Note: The date is only accurate to the millisecond which means it is

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1162295&r1=1162294&r2=1162295&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Sat
Aug 27 04:26:03 2011
@@ -19,9 +19,15 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,20 +37,18 @@ import org.apache.hadoop.hbase.HBaseTest
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
 
 
 /**
@@ -161,9 +165,26 @@ public class TestCompaction extends HBas
     Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100),
null);
     assertEquals(compactionThreshold, result.size());
 
+    // see if CompactionProgress is in place but null
+    for (Store store: this.r.stores.values()) {
+      assertNull(store.getCompactionProgress());
+    }
+
     r.flushcache();
     r.compactStores(true);
 
+    // see if CompactionProgress has done its thing on at least one store
+    int storeCount = 0;
+    for (Store store: this.r.stores.values()) {
+      CompactionProgress progress = store.getCompactionProgress();
+      if( progress != null ) {
+        ++storeCount;
+        assert(progress.currentCompactedKVs > 0);
+        assert(progress.totalCompactingKVs > 0);
+      }
+      assert(storeCount > 0);
+    }
+
     // look at the second row
     // Increment the least significant character so we get to next row.
     byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);



Mime
View raw message