accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1438319 - /accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
Date Fri, 25 Jan 2013 04:39:25 GMT
Author: kturner
Date: Fri Jan 25 04:39:25 2013
New Revision: 1438319

URL: http://svn.apache.org/viewvc?rev=1438319&view=rev
Log:
ACCUMULO-875 list compactions was not showing entries read unless something was written

Modified:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1438319&r1=1438318&r2=1438319&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
Fri Jan 25 04:39:25 2013
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.cloudtrace.instrument.Span;
 import org.apache.accumulo.cloudtrace.instrument.Trace;
@@ -38,11 +39,12 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.CountingIterator;
 import org.apache.accumulo.core.iterators.system.DeletingIterator;
 import org.apache.accumulo.core.iterators.system.MultiIterator;
 import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
@@ -67,6 +69,43 @@ import org.apache.log4j.Logger;
 
 public class Compactor implements Callable<CompactionStats> {
   
+  public class CountingIterator extends WrappingIterator {
+    
+    private long count;
+    
+    public CountingIterator deepCopy(IteratorEnvironment env) {
+      return new CountingIterator(this, env);
+    }
+    
+    private CountingIterator(CountingIterator other, IteratorEnvironment env) {
+      setSource(other.getSource().deepCopy(env));
+      count = 0;
+    }
+    
+    public CountingIterator(SortedKeyValueIterator<Key,Value> source) {
+      this.setSource(source);
+      count = 0;
+    }
+    
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void next() throws IOException {
+      super.next();
+      count++;
+      if (count % 1024 == 0) {
+        entriesRead.addAndGet(1024);
+      }
+    }
+    
+    public long getCount() {
+      return count;
+    }
+  }
+
   private static final Logger log = Logger.getLogger(Compactor.class);
   
   static class CompactionCanceledException extends Exception {
@@ -93,37 +132,22 @@ public class Compactor implements Callab
   // things to report
   private String currentLocalityGroup = "";
   private long startTime;
-  private long currentEntriesRead = 0;
-  private long currentEntriesWritten = 0;
-  private long totalEntriesRead = 0;
-  private long totalEntriesWritten = 0;
+
   private MajorCompactionReason reason;
   protected MinorCompactionReason mincReason;
   
-  private synchronized void updateStats(long read, long written) {
-    this.currentEntriesRead = read;
-    this.currentEntriesWritten = written;
-  }
-
-  private synchronized void clearStats() {
-    totalEntriesRead = 0;
-    totalEntriesWritten = 0;
-    currentEntriesRead = 0;
-    currentEntriesWritten = 0;
-    currentLocalityGroup = "";
-  }
-
-  private synchronized void rollStats() {
-    this.totalEntriesRead = currentEntriesRead;
-    this.totalEntriesWritten = currentEntriesWritten;
-    currentEntriesRead = 0;
-    currentEntriesWritten = 0;
-  }
+  private AtomicLong entriesRead = new AtomicLong(0);
+  private AtomicLong entriesWritten = new AtomicLong(0);
   
   private synchronized void setLocalityGroup(String name) {
     this.currentLocalityGroup = name;
   }
 
+  private void clearStats() {
+    entriesRead.set(0);
+    entriesWritten.set(0);
+  }
+
   protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new
HashSet<Compactor>());
   
   public static class CompactionInfo {
@@ -134,13 +158,9 @@ public class Compactor implements Callab
     private long entriesWritten;
     
     CompactionInfo(Compactor compactor) {
-      // get a consistent snapshot of changing stats
-      synchronized (compactor) {
-        this.localityGroup = compactor.currentLocalityGroup;
-        this.entriesRead = compactor.totalEntriesRead + compactor.currentEntriesRead;
-        this.entriesWritten = compactor.totalEntriesWritten + compactor.currentEntriesWritten;
-      }
-      
+      this.localityGroup = compactor.currentLocalityGroup;
+      this.entriesRead = compactor.entriesRead.get();
+      this.entriesWritten = compactor.entriesWritten.get();
       this.compactor = compactor;
     }
 
@@ -334,7 +354,7 @@ public class Compactor implements Callab
       }
     }
   }
-  
+
   private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName,
ArrayList<FileSKVIterator> readers) throws IOException {
     
     List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
@@ -429,12 +449,10 @@ public class Compactor implements Callab
           entriesCompacted++;
           
           if (entriesCompacted % 1024 == 0) {
-            // Periodically update stats, do not want to do this too often since its syncronized
-            updateStats(citr.getCount(), entriesCompacted);
+            // Periodically update stats, do not want to do this too often since its volatile
+            entriesWritten.addAndGet(1024);
           }
         }
-        
-        rollStats();
 
         if (itr.hasTop() && !env.isCompactionEnabled()) {
           // cancel major compaction operation



Mime
View raw message