accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1466199 - in /accumulo/branches/1.4/src/server/src: main/java/org/apache/accumulo/server/master/tableOps/ main/java/org/apache/accumulo/server/zookeeper/ test/java/org/apache/accumulo/server/zookeeper/
Date Tue, 09 Apr 2013 20:00:31 GMT
Author: ecn
Date: Tue Apr  9 20:00:31 2013
New Revision: 1466199

URL: http://svn.apache.org/r1466199
Log:
ACCUMULO-1044 clearly define "transaction should stop" and "transaction is complete"

Modified:
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
    accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/zookeeper/TransactionWatcherTest.java

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1466199&r1=1466198&r2=1466199&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Tue Apr  9 20:00:31 2013
@@ -303,6 +303,8 @@ class CleanUpBulkImport extends MasterRe
     Utils.unreserveHdfsDirectory(source, tid);
     Utils.unreserveHdfsDirectory(error, tid);
     Utils.getReadLock(tableId, tid).unlock();
+    log.debug("completing bulk import transaction " + tid);
+    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
     return null;
   }
 }

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java?rev=1466199&r1=1466198&r2=1466199&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
Tue Apr  9 20:00:31 2013
@@ -38,6 +38,7 @@ public class TransactionWatcher {
   
   public interface Arbitrator {
     boolean transactionAlive(String type, long tid) throws Exception;
+    boolean transactionComplete(String type, long tid) throws Exception;
   }
   
   public static class ZooArbitrator implements Arbitrator {
@@ -47,7 +48,7 @@ public class TransactionWatcher {
     
     @Override
     public boolean transactionAlive(String type, long tid) throws Exception {
-      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + Long.toString(tid);
+      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid;
       rdr.sync(path);
       return rdr.exists(path);
     }
@@ -57,6 +58,7 @@ public class TransactionWatcher {
       IZooReaderWriter writer = ZooReaderWriter.getInstance();
       writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type, new byte[] {}, NodeExistsPolicy.OVERWRITE);
       writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, new byte[]
{}, NodeExistsPolicy.OVERWRITE);
+      writer.putPersistentData(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running",
new byte[] {}, NodeExistsPolicy.OVERWRITE);
     }
     
     public static void stop(String type, long tid) throws KeeperException, InterruptedException
{
@@ -64,6 +66,20 @@ public class TransactionWatcher {
       IZooReaderWriter writer = ZooReaderWriter.getInstance();
       writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, NodeMissingPolicy.SKIP);
     }
+    
+    public static void cleanup(String type, long tid) throws KeeperException, InterruptedException
{
+      Instance instance = HdfsZooInstance.getInstance();
+      IZooReaderWriter writer = ZooReaderWriter.getInstance();
+      writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid, NodeMissingPolicy.SKIP);
+      writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running",
NodeMissingPolicy.SKIP);
+    }
+
+    @Override
+    public boolean transactionComplete(String type, long tid) throws Exception {
+      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running";
+      rdr.sync(path);
+      return !rdr.exists(path);
+    }
   }
   
   public TransactionWatcher(Arbitrator arbitrator) {

Modified: accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/zookeeper/TransactionWatcherTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/zookeeper/TransactionWatcherTest.java?rev=1466199&r1=1466198&r2=1466199&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/zookeeper/TransactionWatcherTest.java
(original)
+++ accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/zookeeper/TransactionWatcherTest.java
Tue Apr  9 20:00:31 2013
@@ -31,7 +31,8 @@ public class TransactionWatcherTest {
   
   static class SimpleArbitrator implements TransactionWatcher.Arbitrator {
     Map<String,List<Long>> map = new HashMap<String,List<Long>>();
-    
+    Map<String,List<Long>> cleanedUp = new HashMap<String,List<Long>>();
+
     public synchronized void start(String txType, Long txid) throws Exception {
       List<Long> txids = map.get(txType);
       if (txids == null)
@@ -40,6 +41,14 @@ public class TransactionWatcherTest {
         throw new Exception("transaction already started");
       txids.add(txid);
       map.put(txType, txids);
+      
+      txids = cleanedUp.get(txType);
+      if (txids == null)
+        txids = new ArrayList<Long>();
+      if (txids.contains(txid))
+        throw new IllegalStateException("transaction was started but not cleaned up");
+      txids.add(txid);
+      cleanedUp.put(txType, txids);
     }
     
     public synchronized void stop(String txType, Long txid) throws Exception {
@@ -51,6 +60,15 @@ public class TransactionWatcherTest {
       throw new Exception("transaction does not exist");
     }
     
+    public synchronized void cleanup(String txType, Long txid) throws Exception {
+      List<Long> txids = cleanedUp.get(txType);
+      if (txids != null && txids.contains(txid)) {
+        txids.remove(txids.indexOf(txid));
+        return;
+      }
+      throw new Exception("transaction does not exist");
+    }
+
     @Override
     synchronized public boolean transactionAlive(String txType, long tid) throws Exception
{
       List<Long> txids = map.get(txType);
@@ -59,6 +77,14 @@ public class TransactionWatcherTest {
       return txids.contains(tid);
     }
     
+    @Override
+    public boolean transactionComplete(String txType, long tid) throws Exception {
+      List<Long> txids = cleanedUp.get(txType);
+      if (txids == null)
+        return true;
+      return !txids.contains(tid);
+    }
+
   }
   
   @Test
@@ -84,8 +110,12 @@ public class TransactionWatcherTest {
       }
     });
     Assert.assertFalse(txw.isActive(txid));
+    Assert.assertFalse(sa.transactionComplete(txType,  txid));
     sa.stop(txType, txid);
     Assert.assertFalse(sa.transactionAlive(txType, txid));
+    Assert.assertFalse(sa.transactionComplete(txType,  txid));
+    sa.cleanup(txType, txid);
+    Assert.assertTrue(sa.transactionComplete(txType,  txid));
     try {
       txw.run(txType, txid, new Callable<Object>() {
         @Override



Mime
View raw message