accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/2] git commit: ACCUMULO-1833 Simple usage of AtomicInteger to catch table cache invalidations and propagate them through MTBW's cache.
Date Fri, 08 Nov 2013 16:36:55 GMT
ACCUMULO-1833 Simple usage of AtomicInteger to catch table cache
invalidations and propagate them through MTBW's cache.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/29efac97
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/29efac97
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/29efac97

Branch: refs/heads/ACCUMULO-1833-caching
Commit: 29efac97c82edd9fc08d37a83676cfdc18503103
Parents: c8e607f
Author: Josh Elser <josh.elser@gmail.com>
Authored: Fri Nov 8 11:36:06 2013 -0500
Committer: Josh Elser <josh.elser@gmail.com>
Committed: Fri Nov 8 11:36:06 2013 -0500

----------------------------------------------------------------------
 .../client/impl/MultiTableBatchWriterImpl.java  | 29 ++++++-
 .../accumulo/core/client/impl/Tables.java       |  7 ++
 .../test/MultiTableBatchWriterTest.java         | 88 ++++++++++++--------
 3 files changed, 86 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/29efac97/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
index 34318d1..b99c42b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -39,6 +40,7 @@ import org.apache.log4j.Logger;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 
 public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
   public static final long DEFAULT_CACHE_TIME = 200;
@@ -46,6 +48,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter
{
   
   static final Logger log = Logger.getLogger(MultiTableBatchWriterImpl.class);
   private AtomicBoolean closed;
+  private AtomicInteger cacheLastState;
 
   private class TableBatchWriter implements BatchWriter {
 
@@ -84,7 +87,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter
{
   private class TableNameToIdLoader extends CacheLoader<String,String> {
 
     @Override
-    public String load(String tableName) throws Exception {
+    public String load(String tableName) throws Exception {      
       String tableId = Tables.getNameToIdMap(instance).get(tableName);
 
       if (tableId == null)
@@ -113,6 +116,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter
{
     this.bw = new TabletServerBatchWriter(instance, credentials, config);
     tableWriters = new ConcurrentHashMap<String,BatchWriter>();
     this.closed = new AtomicBoolean(false);
+    this.cacheLastState = new AtomicInteger(0);
 
     nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, cacheTimeUnit).concurrencyLevel(8).maximumSize(64).initialCapacity(16)
         .build(new TableNameToIdLoader());
@@ -151,6 +155,20 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter
{
   private String getId(String tableName) throws TableNotFoundException {
     try {
       return nameToIdCache.get(tableName);
+    } catch (UncheckedExecutionException e) {
+      Throwable cause = e.getCause();
+      
+      log.error("Unexpected exception when fetching table id for " + tableName);
+
+      if (null == cause) {
+        throw new RuntimeException(e);
+      } else if (cause instanceof TableNotFoundException) {
+        throw (TableNotFoundException) cause;
+      } else if (cause instanceof TableOfflineException) {
+        throw (TableOfflineException) cause;
+      }
+      
+      throw e;
     } catch (ExecutionException e) {
       Throwable cause = e.getCause();
       
@@ -171,6 +189,15 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter
{
   @Override
   public BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
     ArgumentChecker.notNull(tableName);
+    
+    int cacheResetCount = Tables.getCacheResetCount();
+    int internalResetCount = cacheLastState.get();
+    
+    if (cacheResetCount > internalResetCount) {
+      cacheLastState.set(cacheResetCount);
+      nameToIdCache.invalidateAll();
+    }
+    
     String tableId = getId(tableName);
 
     BatchWriter tbw = tableWriters.get(tableId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/29efac97/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
index 71518c5..2a85954 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
@@ -31,6 +32,7 @@ import org.apache.accumulo.fate.zookeeper.ZooCache;
 
 public class Tables {
   private static SecurityPermission TABLES_PERMISSION = new SecurityPermission("tablesPermission");
+  private static AtomicInteger cacheResetCount = new AtomicInteger(0);
   
   private static ZooCache getZooCache(Instance instance) {
     SecurityManager sm = System.getSecurityManager();
@@ -89,6 +91,7 @@ public class Tables {
   }
   
   public static void clearCache(Instance instance) {
+    cacheResetCount.incrementAndGet();
     getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZTABLES);
   }
   
@@ -111,4 +114,8 @@ public class Tables {
     
     return TableState.valueOf(new String(state));
   }
+  
+  public static int getCacheResetCount() {
+    return cacheResetCount.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/29efac97/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
index 8ed6931..cee1b90 100644
--- a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
@@ -161,11 +161,27 @@ public class MultiTableBatchWriterTest {
       bw2.addMutation(m1);
 
       tops.rename(table1, newTable1);
-      tops.rename(table2, newTable2);
 
-      // MTBW is still caching this name to the correct table
-      bw1 = mtbw.getBatchWriter(table1);
-      bw2 = mtbw.getBatchWriter(table2);
+      // MTBW is still caching this name to the correct table, but we should invalidate its
cache
+      // after seeing the rename
+      try {
+        bw1 = mtbw.getBatchWriter(table1);
+        Assert.fail("Should not be able to find this table");
+      } catch (TableNotFoundException e) {
+        // pass
+      }
+
+      tops.rename(table2, newTable2);
+      
+      try {
+        bw2 = mtbw.getBatchWriter(table2);
+        Assert.fail("Should not be able to find this table");
+      } catch (TableNotFoundException e) {
+        //pass
+      }
+      
+      bw1 = mtbw.getBatchWriter(newTable1);
+      bw2 = mtbw.getBatchWriter(newTable2);
 
       Mutation m2 = new Mutation("bar");
       m2.put("col1", "", "val1");
@@ -284,15 +300,14 @@ public class MultiTableBatchWriterTest {
       bw1.addMutation(m2);
       bw2.addMutation(m2);
 
-      try {
-        mtbw.close();
-        Assert.fail("Should not be able to close batch writers");
-      } catch (MutationsRejectedException e) {
-        // Pass
-      }
     } finally {
       if (null != mtbw) {
-        mtbw.close();
+        try {
+          mtbw.close();
+          Assert.fail("Should not be able to close batch writers");
+        } catch (MutationsRejectedException e) {
+          // Pass
+        }
       }
     }
   }
@@ -333,16 +348,14 @@ public class MultiTableBatchWriterTest {
 
       bw1.addMutation(m2);
       bw2.addMutation(m2);
-
-      try {
-        mtbw.close();
-        Assert.fail("Should not be able to close batch writers");
-      } catch (MutationsRejectedException e) {
-        // Pass
-      }
     } finally {
       if (null != mtbw) {
-        mtbw.close();
+        try {
+          mtbw.close();
+          Assert.fail("Should not be able to close batch writers");
+        } catch (MutationsRejectedException e) {
+          // Pass
+        }
       }
 
     }
@@ -376,27 +389,28 @@ public class MultiTableBatchWriterTest {
       bw2.addMutation(m1);
 
       tops.offline(table1);
-      tops.offline(table2);
-
-      bw1 = mtbw.getBatchWriter(table1);
-      bw2 = mtbw.getBatchWriter(table2);
 
-      Mutation m2 = new Mutation("bar");
-      m2.put("col1", "", "val1");
-      m2.put("col2", "", "val2");
+      try {
+        bw1 = mtbw.getBatchWriter(table1);
+      } catch (TableOfflineException e) {
+        // pass
+      }
 
-      bw1.addMutation(m2);
-      bw2.addMutation(m2);
+      tops.offline(table2);
 
       try {
-        mtbw.close();
-        Assert.fail("Should not be able to close batch writers");
-      } catch (MutationsRejectedException e) {
-        // Pass
+        bw2 = mtbw.getBatchWriter(table2);
+      } catch (TableOfflineException e) {
+        // pass
       }
     } finally {
       if (null != mtbw) {
-        mtbw.close();
+        try {
+          mtbw.close();
+          Assert.fail("Expecting close on MTBW to fail due to offline tables");
+        } catch (MutationsRejectedException e) {
+          // Pass
+        }
       }
     }
   }
@@ -434,15 +448,15 @@ public class MultiTableBatchWriterTest {
       try {
         bw1 = mtbw.getBatchWriter(table1);
         Assert.fail(table1 + " should be offline");
-      } catch (UncheckedExecutionException e) {
-        Assert.assertEquals(TableOfflineException.class, e.getCause().getClass());
+      } catch (TableOfflineException e) {
+        // pass
       }
 
       try {
         bw2 = mtbw.getBatchWriter(table2);
         Assert.fail(table1 + " should be offline");
-      } catch (UncheckedExecutionException e) {
-        Assert.assertEquals(TableOfflineException.class, e.getCause().getClass());
+      } catch (TableOfflineException e) {
+        // pass
       }
     } finally {
       if (null != mtbw) {


Mime
View raw message