accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1445876 - in /accumulo/trunk/server/src: main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
Date Wed, 13 Feb 2013 19:30:30 GMT
Author: ecn
Date: Wed Feb 13 19:30:29 2013
New Revision: 1445876

URL: http://svn.apache.org/r1445876
Log:
ACCUMULO-1062 serialize writes to ensure counts, rather than serializing the return using
check/notify for counts

Modified:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
    accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java?rev=1445876&r1=1445875&r2=1445876&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
Wed Feb 13 19:30:29 2013
@@ -334,6 +334,8 @@ public class InMemoryMap {
   
   private AtomicInteger nextKVCount = new AtomicInteger(1);
   private AtomicInteger kvCount = new AtomicInteger(0);
+
+  private Object writeSerializer = new Object();
   
   /**
    * Applies changes to a row in the InMemoryMap
@@ -343,26 +345,20 @@ public class InMemoryMap {
     int numKVs = 0;
     for (int i = 0; i < mutations.size(); i++)
       numKVs += mutations.get(i).size();
-    int kv = nextKVCount.getAndAdd(numKVs);
-    try {
-      map.mutate(mutations, kv);
-    } finally {
-      synchronized (this) {
-        // Can not update mutationCount while writes that started before
-        // are in progress, this would cause partial mutations to be seen.
-        // Also, can not continue until mutation count is updated, because
-        // a read may not see a successful write. Therefore writes must
-        // wait for writes that started before to finish.
-        
-        while (kvCount.get() != kv - 1) {
-          try {
-            wait();
-          } catch (InterruptedException ex) {
-            // ignored
-          }
-        }
+    
+    // Can not update mutationCount while writes that started before
+    // are in progress, this would cause partial mutations to be seen.
+    // Also, can not continue until mutation count is updated, because
+    // a read may not see a successful write. Therefore writes must
+    // wait for writes that started before to finish.
+    //
+    // using separate lock from this map, to allow read/write in parallel
+    synchronized (writeSerializer ) {
+      int kv = nextKVCount.getAndAdd(numKVs);
+      try {
+        map.mutate(mutations, kv);
+      } finally {
         kvCount.set(kv + numKVs - 1);
-        notifyAll();
       }
     }
   }

Modified: accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java?rev=1445876&r1=1445875&r2=1445876&view=diff
==============================================================================
--- accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
(original)
+++ accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
Wed Feb 13 19:30:29 2013
@@ -19,6 +19,10 @@ package org.apache.accumulo.server.table
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
@@ -38,6 +42,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.junit.Before;
+import org.junit.Test;
 
 public class InMemoryMapTest extends TestCase {
   
@@ -291,4 +296,54 @@ public class InMemoryMapTest extends Tes
     ae(skvi1, "r1", "foo:cq", 3, "v2");
     ae(skvi1, "r1", "foo:cq", 3, "v1");
   }
+  
+  private static final Logger log = Logger.getLogger(InMemoryMapTest.class);
+
+  static long sum(long[] counts) {
+    long result = 0;
+    for (int i = 0; i < counts.length; i++) 
+      result  += counts[i];
+    return result;
+  }
+  
+  @Test
+  public void testParallelWriteSpeed() throws InterruptedException {
+    List<Double> timings = new ArrayList<Double>();
+    for (int threads: new int[]{1, 2, 16, 64, 256, 2048} ) {
+      final long now = System.currentTimeMillis();
+      final long counts[] = new long[threads];
+      final InMemoryMap imm = new InMemoryMap(false, "/tmp");
+      ExecutorService e = Executors.newFixedThreadPool(threads);
+      for (int j = 0; j < threads; j++) {
+        final int threadId = j;
+        e.execute(new Runnable() {
+          @Override
+          public void run() {
+            while (System.currentTimeMillis() - now < 1000) {
+              for (int k = 0; k < 1000; k++) {
+                Mutation m = new Mutation("row");
+                m.put("cf", "cq", new Value("v".getBytes()));
+                List<Mutation> mutations = Collections.singletonList(m);
+                imm.mutate(mutations);
+                counts[threadId]++;
+              }
+            }
+          }
+        });
+      }
+      e.shutdown();
+      e.awaitTermination(10, TimeUnit.SECONDS);
+      imm.delete(10000);
+      double mutationsPerSecond = sum(counts)/((System.currentTimeMillis() - now)/1000.);
+      timings.add(mutationsPerSecond);
+      log.info(String.format("%.1f mutations per second with %d threads", mutationsPerSecond,
threads));
+    }
+    // verify that more threads doesn't go a lot faster, or a lot slower than one thread
+    for (int i = 0; i < timings.size(); i++) {
+      double ratioFirst = timings.get(0) / timings.get(i); 
+      assertTrue(ratioFirst < 2);
+      assertTrue(ratioFirst > 0.5);
+    }
+  }
+
 }



Mime
View raw message