accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject svn commit: r1355481 [1/4] - in /accumulo/trunk: ./ bin/ core/ core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org...
Date Fri, 29 Jun 2012 17:42:56 GMT
Author: billie
Date: Fri Jun 29 17:42:35 2012
New Revision: 1355481

URL: http://svn.apache.org/viewvc?rev=1355481&view=rev
Log:
ACCUMULO-647 - moved fate and zookeeper code to a separate module

Added:
    accumulo/trunk/fate/
    accumulo/trunk/fate/pom.xml   (with props)
    accumulo/trunk/fate/src/
    accumulo/trunk/fate/src/main/
    accumulo/trunk/fate/src/main/java/
    accumulo/trunk/fate/src/main/java/org/
    accumulo/trunk/fate/src/main/java/org/apache/
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Fate.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Repo.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java   (with props)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java   (with props)
    accumulo/trunk/fate/src/test/
    accumulo/trunk/fate/src/test/java/
    accumulo/trunk/fate/src/test/java/org/
    accumulo/trunk/fate/src/test/java/org/apache/
    accumulo/trunk/fate/src/test/java/org/apache/accumulo/
    accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/
    accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/
    accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java   (with props)
    accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java   (with props)
Removed:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/IZooReader.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Fate.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Print.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Repo.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/StackOverflowException.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/TStore.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/ZooStore.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedReadWriteLock.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/IZooReaderWriter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooReservation.java
    accumulo/trunk/server/src/test/java/org/apache/accumulo/server/zookeeper/
Modified:
    accumulo/trunk/bin/tool.sh
    accumulo/trunk/core/pom.xml
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
    accumulo/trunk/pom.xml
    accumulo/trunk/server/pom.xml
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ChangeTableState.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CloneTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/MasterRepo.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/RenameTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/Utils.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReport.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/security/ZKAuthenticator.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/VerifyIngest.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestClean.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/DumpZookeeper.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ListInstances.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/RestoreZookeeper.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TablePropUtil.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
    accumulo/trunk/test/system/auto/simple/examples.py
    accumulo/trunk/test/system/auto/simple/mapreduce.py

Modified: accumulo/trunk/bin/tool.sh
URL: http://svn.apache.org/viewvc/accumulo/trunk/bin/tool.sh?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/bin/tool.sh (original)
+++ accumulo/trunk/bin/tool.sh Fri Jun 29 17:42:35 2012
@@ -33,6 +33,7 @@ LIB=$ACCUMULO_HOME/lib
 
 ZOOKEEPER_CMD='ls -1 $ZOOKEEPER_HOME/zookeeper-[0-9]*[^csn].jar '
 CORE_CMD='ls -1 $LIB/accumulo-core-*[^cs].jar'
+FATE_CMD='ls -1 $LIB/accumulo-fate-*[^cs].jar'
 THRIFT_CMD='ls -1 $LIB/libthrift-*[^cs].jar'
 CLOUDTRACE_CMD='ls -1 $LIB/cloudtrace-*[^cs].jar'
 
@@ -46,6 +47,11 @@ if [ `eval $CORE_CMD | wc -l` != "1" ] ;
     exit 1
 fi
 
+if [ `eval $FATE_CMD | wc -l` != "1" ] ; then
+    echo "Not exactly one accumulo-fate jar in $LIB"
+    exit 1
+fi
+
 if [ `eval $THRIFT_CMD | wc -l` != "1" ] ; then
     echo "Not exactly one thrift jar in $LIB"
     exit 1
@@ -58,6 +64,7 @@ fi
 
 ZOOKEEPER_LIB=`eval $ZOOKEEPER_CMD`
 CORE_LIB=`eval $CORE_CMD`
+FATE_LIB=`eval $FATE_CMD`
 THRIFT_LIB=`eval $THRIFT_CMD`
 CLOUDTRACE_LIB=`eval $CLOUDTRACE_CMD`
 
@@ -80,8 +87,8 @@ for arg in "$@"; do
   fi
 done
 
-LIB_JARS="$THRIFT_LIB,$CORE_LIB,$ZOOKEEPER_LIB,$CLOUDTRACE_LIB"
-H_JARS="$THRIFT_LIB:$CORE_LIB:$ZOOKEEPER_LIB:$CLOUDTRACE_LIB:"
+LIB_JARS="$THRIFT_LIB,$CORE_LIB,$FATE_LIB,$ZOOKEEPER_LIB,$CLOUDTRACE_LIB"
+H_JARS="$THRIFT_LIB:$CORE_LIB:$FATE_LIB:$ZOOKEEPER_LIB:$CLOUDTRACE_LIB:"
 
 COMMONS_LIBS=`ls -1 $LIB/commons-*.jar`
 for jar in $USERJARS $COMMONS_LIBS; do

Modified: accumulo/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/pom.xml?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/pom.xml (original)
+++ accumulo/trunk/core/pom.xml Fri Jun 29 17:42:35 2012
@@ -81,6 +81,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-fate</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
       <artifactId>cloudtrace</artifactId>
     </dependency>
     <dependency>

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java Fri Jun 29 17:42:35 2012
@@ -33,8 +33,8 @@ import org.apache.accumulo.core.util.Byt
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.OpTimer;
 import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java Fri Jun 29 17:42:35 2012
@@ -38,8 +38,8 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
 
 /**
  * Provides a class for administering the accumulo instance

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java Fri Jun 29 17:42:35 2012
@@ -34,8 +34,8 @@ import org.apache.accumulo.core.util.Ser
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TServiceClient;
 import org.apache.thrift.transport.TTransport;

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java Fri Jun 29 17:42:35 2012
@@ -26,8 +26,8 @@ import org.apache.accumulo.core.Constant
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
 
 public class Tables {
   private static SecurityPermission TABLES_PERMISSION = new SecurityPermission("tablesPermission");

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java Fri Jun 29 17:42:35 2012
@@ -23,8 +23,8 @@ import java.net.UnknownHostException;
 import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.zookeeper.ZooReader;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.zookeeper.KeeperException;
 
 

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java Fri Jun 29 17:42:35 2012
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.accumulo.cloudtrace.instrument.receivers.SendSpansViaThrift;
-import org.apache.accumulo.core.zookeeper.ZooReader;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java Fri Jun 29 17:42:35 2012
@@ -129,7 +129,7 @@ import org.apache.accumulo.core.util.she
 import org.apache.accumulo.core.util.shell.commands.UserPermissionsCommand;
 import org.apache.accumulo.core.util.shell.commands.UsersCommand;
 import org.apache.accumulo.core.util.shell.commands.WhoAmICommand;
-import org.apache.accumulo.core.zookeeper.ZooReader;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -245,7 +245,8 @@ public class Shell extends ShellOptions 
     byte[] pass;
     try {
       if (!cl.hasOption(fakeOption.getLongOpt())) {
-        DistributedTrace.enable(instance, new ZooReader(instance), "shell", InetAddress.getLocalHost().getHostName());
+        DistributedTrace.enable(instance, new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), "shell", InetAddress.getLocalHost()
+            .getHostName());
       }
       
       Runtime.getRuntime().addShutdownHook(new Thread() {

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java Fri Jun 29 17:42:35 2012
@@ -16,195 +16,10 @@
  */
 package org.apache.accumulo.core.zookeeper;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
 
-public class ZooUtil {
-  public enum NodeExistsPolicy {
-    SKIP, OVERWRITE, FAIL
-  }
-  
-  public enum NodeMissingPolicy {
-    SKIP, CREATE, FAIL
-  }
-  
-  public static class LockID {
-    public long eid;
-    public String path;
-    public String node;
-    
-    public LockID(String root, String serializedLID) {
-      String sa[] = serializedLID.split("\\$");
-      int lastSlash = sa[0].lastIndexOf('/');
-      
-      if (sa.length != 2 || lastSlash < 0) {
-        throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
-      }
-      
-      if (lastSlash == 0)
-        path = root;
-      else
-        path = root + "/" + sa[0].substring(0, lastSlash);
-      node = sa[0].substring(lastSlash + 1);
-      eid = Long.parseLong(sa[1], 16);
-    }
-    
-    public LockID(String path, String node, long eid) {
-      this.path = path;
-      this.node = node;
-      this.eid = eid;
-    }
-    
-    public String serialize(String root) {
-      
-      return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
-    }
-    
-    @Override
-    public String toString() {
-      return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
-    }
-  }
-  
-  public static final List<ACL> PRIVATE;
-  public static final List<ACL> PUBLIC;
-  static {
-    PRIVATE = new ArrayList<ACL>();
-    PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
-    PUBLIC = new ArrayList<ACL>();
-    PUBLIC.addAll(PRIVATE);
-    PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
-  }
-  
-  /**
-   * This method will delete a node and all its children from zookeeper
-   * 
-   * @param zPath
-   *          the path to delete
-   */
-  public static void recursiveDelete(ZooKeeper zk, String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
-    if (policy.equals(NodeMissingPolicy.CREATE))
-      throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
-    try {
-      for (String child : zk.getChildren(zPath, false))
-        recursiveDelete(zk, zPath + "/" + child, NodeMissingPolicy.SKIP);
-      
-      Stat stat;
-      if ((stat = zk.exists(zPath, null)) != null)
-        zk.delete(zPath, stat.getVersion());
-    } catch (KeeperException e) {
-      if (policy.equals(NodeMissingPolicy.SKIP) && e.code().equals(KeeperException.Code.NONODE))
-        return;
-      throw e;
-    }
-  }
-  
-  public static void recursiveDelete(ZooKeeper zk, String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
-    recursiveDelete(zk, zPath, -1, policy);
-  }
-  
-  /**
-   * Create a persistent node with the default ACL
-   * 
-   * @return true if the node was created or altered; false if it was skipped
-   */
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
-  }
-  
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException,
-      InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
-  }
-  
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy, List<ACL> acls)
-      throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
-  }
-  
-  private static boolean putData(ZooKeeper zk, String zPath, byte[] data, CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls)
-      throws KeeperException, InterruptedException {
-    if (policy == null)
-      policy = NodeExistsPolicy.FAIL;
-    
-    while (true) {
-      try {
-        zk.create(zPath, data, acls, mode);
-        return true;
-      } catch (NodeExistsException nee) {
-        switch (policy) {
-          case SKIP:
-            return false;
-          case OVERWRITE:
-            try {
-              zk.setData(zPath, data, version);
-              return true;
-            } catch (NoNodeException nne) {
-              // node delete between create call and set data, so try create call again
-              continue;
-            }
-          default:
-            throw nee;
-        }
-      }
-    }
-  }
-  
-  public static byte[] getData(ZooKeeper zk, String zPath, Stat stat) throws KeeperException, InterruptedException {
-    return zk.getData(zPath, false, stat);
-  }
-  
-  public static Stat getStatus(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
-    return zk.exists(zPath, false);
-  }
-  
-  public static boolean exists(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
-    return getStatus(zk, zPath) != null;
-  }
-  
-  public static void recursiveCopyPersistent(ZooKeeper zk, String source, String destination, NodeExistsPolicy policy) throws KeeperException,
-      InterruptedException {
-    Stat stat = null;
-    if (!exists(zk, source))
-      throw KeeperException.create(Code.NONODE, source);
-    if (exists(zk, destination)) {
-      switch (policy) {
-        case OVERWRITE:
-          break;
-        case SKIP:
-          return;
-        case FAIL:
-        default:
-          throw KeeperException.create(Code.NODEEXISTS, source);
-      }
-    }
-    
-    stat = new Stat();
-    byte[] data = zk.getData(source, false, stat);
-    if (stat.getEphemeralOwner() == 0) {
-      if (data == null)
-        throw KeeperException.create(Code.NONODE, source);
-      putPersistentData(zk, destination, data, policy);
-      if (stat.getNumChildren() > 0)
-        for (String child : zk.getChildren(source, false))
-          recursiveCopyPersistent(zk, source + "/" + child, destination + "/" + child, policy);
-    }
-  }
-  
+public class ZooUtil extends org.apache.accumulo.fate.zookeeper.ZooUtil {
   public static String getRoot(Instance instance) {
     return getRoot(instance.getInstanceID());
   }
@@ -212,51 +27,4 @@ public class ZooUtil {
   public static String getRoot(String instanceId) {
     return Constants.ZROOT + "/" + instanceId;
   }
-  
-  public static boolean putPrivatePersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
-  }
-  
-  public static String putPersistentSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
-    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
-  }
-  
-  public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
-    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
-  }
-  
-  public static byte[] getLockData(ZooCache zc, String path) {
-    
-    List<String> children = zc.getChildren(path);
-    
-    if (children.size() == 0) {
-      return null;
-    }
-    
-    children = new ArrayList<String>(children);
-    Collections.sort(children);
-    
-    String lockNode = children.get(0);
-    
-    return zc.get(path + "/" + lockNode);
-  }
-  
-  public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
-    
-    List<String> children = zk.getChildren(lid.path, false);
-    
-    if (children.size() == 0) {
-      return false;
-    }
-    
-    Collections.sort(children);
-    
-    String lockNode = children.get(0);
-    if (!lid.node.equals(lockNode))
-      return false;
-    
-    Stat stat = zk.exists(lid.path + "/" + lid.node, false);
-    return stat != null && stat.getEphemeralOwner() == lid.eid;
-  }
-  
 }

Added: accumulo/trunk/fate/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/pom.xml?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/pom.xml (added)
+++ accumulo/trunk/fate/pom.xml Fri Jun 29 17:42:35 2012
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <parent>
+    <groupId>org.apache.accumulo</groupId>
+    <artifactId>accumulo</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>accumulo-fate</artifactId>
+  <name>accumulo-fate</name>
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <artifactId>maven-jar-plugin</artifactId>
+          <configuration>
+            <outputDirectory>../lib</outputDirectory>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>

Propchange: accumulo/trunk/fate/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.fate.TStore.TStatus;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A utility to administer FATE operations
+ */
+public class AdminUtil<T> {
+  public void print(ZooStore<T> zs, IZooReaderWriter zk, String lockPath) throws KeeperException, InterruptedException {
+    Map<Long,List<String>> heldLocks = new HashMap<Long,List<String>>();
+    Map<Long,List<String>> waitingLocks = new HashMap<Long,List<String>>();
+    
+    List<String> lockedIds = zk.getChildren(lockPath);
+    
+    for (String id : lockedIds) {
+      try {
+        List<String> lockNodes = zk.getChildren(lockPath + "/" + id);
+        lockNodes = new ArrayList<String>(lockNodes);
+        Collections.sort(lockNodes);
+        
+        int pos = 0;
+        boolean sawWriteLock = false;
+        
+        for (String node : lockNodes) {
+          try {
+            byte[] data = zk.getData(lockPath + "/" + id + "/" + node, null);
+            String lda[] = new String(data).split(":");
+            
+            if (lda[0].charAt(0) == 'W')
+              sawWriteLock = true;
+            
+            Map<Long,List<String>> locks;
+            
+            if (pos == 0) {
+              locks = heldLocks;
+            } else {
+              if (lda[0].charAt(0) == 'R' && !sawWriteLock) {
+                locks = heldLocks;
+              } else {
+                locks = waitingLocks;
+              }
+            }
+            
+            List<String> tables = locks.get(Long.parseLong(lda[1], 16));
+            if (tables == null) {
+              tables = new ArrayList<String>();
+              locks.put(Long.parseLong(lda[1], 16), tables);
+            }
+            
+            tables.add(lda[0].charAt(0) + ":" + id);
+            
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+          pos++;
+        }
+        
+      } catch (Exception e) {
+        e.printStackTrace();
+        System.err.println("Failed to read locks for " + id + " continuing");
+      }
+    }
+    
+    List<Long> transactions = zs.list();
+    
+    for (Long tid : transactions) {
+      
+      zs.reserve(tid);
+      
+      String debug = (String) zs.getProperty(tid, "debug");
+      
+      List<String> hlocks = heldLocks.remove(tid);
+      if (hlocks == null)
+        hlocks = Collections.emptyList();
+      
+      List<String> wlocks = waitingLocks.remove(tid);
+      if (wlocks == null)
+        wlocks = Collections.emptyList();
+      
+      String top = null;
+      Repo<T> repo = zs.top(tid);
+      if (repo != null)
+        top = repo.getDescription();
+      
+      TStatus status = null;
+      status = zs.getStatus(tid);
+      
+      zs.unreserve(tid, 0);
+      
+      System.out.printf("txid: %016x  status: %-18s  op: %-15s  locked: %-15s locking: %-15s top: %s\n", tid, status, debug, hlocks, wlocks, top);
+    }
+    
+    if (heldLocks.size() != 0 || waitingLocks.size() != 0) {
+      System.out.println();
+      System.out.println("The following locks did not have an associated FATE operation");
+      System.out.println();
+      for (Entry<Long,List<String>> entry : heldLocks.entrySet())
+        System.out.printf("txid: %016x  locked: %s\n", entry.getKey(), entry.getValue());
+      
+      for (Entry<Long,List<String>> entry : waitingLocks.entrySet())
+        System.out.printf("txid: %016x  locking: %s\n", entry.getKey(), entry.getValue());
+    }
+  }
+  
+  public void prepDelete(ZooStore<T> zs, String path, String txidStr) {
+    checkGlobalLock(path);
+    
+    long txid = Long.parseLong(txidStr, 16);
+    zs.reserve(txid);
+    zs.delete(txid);
+    zs.unreserve(txid, 0);
+  }
+  
+  public void prepFail(ZooStore<T> zs, String path, String txidStr) {
+    checkGlobalLock(path);
+    
+    long txid = Long.parseLong(txidStr, 16);
+    zs.reserve(txid);
+    zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+    zs.unreserve(txid, 0);
+  }
+  
+  public void deleteLocks(ZooStore<T> zs, IZooReaderWriter zk, String path, String txidStr) throws KeeperException, InterruptedException {
+    // delete any locks assoc w/ fate operation
+    List<String> lockedIds = zk.getChildren(path);
+    
+    for (String id : lockedIds) {
+      List<String> lockNodes = zk.getChildren(path + "/" + id);
+      for (String node : lockNodes) {
+        String lockPath = path + "/" + id + "/" + node;
+        byte[] data = zk.getData(path + "/" + id + "/" + node, null);
+        String lda[] = new String(data).split(":");
+        if (lda[1].equals(txidStr))
+          zk.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+      }
+    }
+  }
+  
+  public void checkGlobalLock(String path) {
+    if (ZooLock.getLockData(path) != null) {
+      System.err.println("ERROR: Master lock is held, not running");
+      System.exit(-1);
+    }
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Fate.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Fate.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Fate.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Fate.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.util.EnumSet;
+
+import org.apache.accumulo.fate.TStore.TStatus;
+import org.apache.accumulo.fate.util.Daemon;
+import org.apache.accumulo.fate.util.LoggingRunnable;
+import org.apache.log4j.Logger;
+
+/**
+ * Fault tolerant executor
+ * 
+ * 
+ */
+
+public class Fate<T> {
+  
+  private static final String DEBUG_PROP = "debug";
+  private static final String AUTO_CLEAN_PROP = "autoClean";
+  private static final String EXCEPTION_PROP = "exception";
+  private static final String RETURN_PROP = "return";
+  
+  final private static Logger log = Logger.getLogger(Fate.class);
+  
+  private TStore<T> store;
+  private T environment;
+  
+  private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(TStatus.FAILED, TStatus.SUCCESSFUL, TStatus.UNKNOWN);
+  
+  private class TransactionRunner implements Runnable {
+    
+    @Override
+    public void run() {
+      while (true) {
+        long deferTime = 0;
+        long tid = store.reserve();
+        try {
+          TStatus status = store.getStatus(tid);
+          Repo<T> op = store.top(tid);
+          if (status == TStatus.FAILED_IN_PROGRESS) {
+            processFailed(tid, op);
+          } else {
+            Repo<T> prevOp = null;
+            try {
+              deferTime = op.isReady(tid, environment);
+              if (deferTime == 0) {
+                prevOp = op;
+                op = op.call(tid, environment);
+              } else
+                continue;
+              
+            } catch (Exception e) {
+              transitionToFailed(tid, op, e);
+              continue;
+            }
+            
+            if (op == null) {
+              // transaction is finished
+              String ret = prevOp.getReturn();
+              if (ret != null)
+                store.setProperty(tid, RETURN_PROP, ret);
+              store.setStatus(tid, TStatus.SUCCESSFUL);
+              doCleanUp(tid);
+            } else {
+              try {
+                store.push(tid, op);
+              } catch (StackOverflowException e) {
+                // the op that failed to push onto the stack was never executed, so no need to undo it
+                // just transition to failed and undo the ops that executed
+                transitionToFailed(tid, op, e);
+                continue;
+              }
+            }
+          }
+        } finally {
+          store.unreserve(tid, deferTime);
+        }
+        
+      }
+    }
+    
+    private void transitionToFailed(long tid, Repo<T> op, Exception e) {
+      store.setProperty(tid, EXCEPTION_PROP, e);
+      store.setStatus(tid, TStatus.FAILED_IN_PROGRESS);
+      log.warn("Failed to execute Repo, tid=" + String.format("%016x", tid), e);
+    }
+    
+    private void processFailed(long tid, Repo<T> op) {
+      while (op != null) {
+        undo(tid, op);
+        
+        store.pop(tid);
+        op = store.top(tid);
+      }
+      
+      store.setStatus(tid, TStatus.FAILED);
+      doCleanUp(tid);
+    }
+    
+    private void doCleanUp(long tid) {
+      Boolean autoClean = (Boolean) store.getProperty(tid, AUTO_CLEAN_PROP);
+      if (autoClean != null && autoClean) {
+        store.delete(tid);
+      } else {
+        // no longer need persisted operations, so delete them to save space in case
+        // TX is never cleaned up...
+        while (store.top(tid) != null)
+          store.pop(tid);
+      }
+    }
+    
+    private void undo(long tid, Repo<T> op) {
+      try {
+        op.undo(tid, environment);
+      } catch (Exception e) {
+        log.warn("Failed to undo Repo, tid=" + String.format("%016x", tid), e);
+      }
+    }
+    
+  }
+  
+  public Fate(T environment, TStore<T> store, int numTreads) {
+    this.store = store;
+    this.environment = environment;
+    
+    for (int i = 0; i < numTreads; i++) {
+      // TODO: use a ExecutorService, maybe a utility to do these steps throughout the server packages
+      Thread thread = new Daemon(new LoggingRunnable(log, new TransactionRunner()), "Repo runner " + i);
+      thread.start();
+    }
+  }
+  
+  // get a transaction id back to the requester before doing any work
+  public long startTransaction() {
+    long dir = store.create();
+    return dir;
+  }
+  
+  // start work in the transaction.. it is safe to call this
+  // multiple times for a transaction... but it will only seed once
+  public void seedTransaction(long tid, Repo<T> repo, boolean autoCleanUp) {
+    store.reserve(tid);
+    try {
+      if (store.getStatus(tid) == TStatus.NEW) {
+        if (store.top(tid) == null) {
+          try {
+            store.push(tid, repo);
+          } catch (StackOverflowException e) {
+            // this should not happen
+            throw new RuntimeException(e);
+          }
+        }
+        
+        if (autoCleanUp)
+          store.setProperty(tid, AUTO_CLEAN_PROP, new Boolean(autoCleanUp));
+        
+        store.setProperty(tid, DEBUG_PROP, repo.getDescription());
+        
+        store.setStatus(tid, TStatus.IN_PROGRESS);
+      }
+    } finally {
+      store.unreserve(tid, 0);
+    }
+    
+  }
+  
+  // check on the transaction
+  public TStatus waitForCompletion(long tid) {
+    return store.waitForStatusChange(tid, FINISHED_STATES);
+  }
+  
+  // resource cleanup
+  public void delete(long tid) {
+    store.reserve(tid);
+    try {
+      switch (store.getStatus(tid)) {
+        case NEW:
+        case FAILED:
+        case SUCCESSFUL:
+          store.delete(tid);
+          break;
+        case FAILED_IN_PROGRESS:
+        case IN_PROGRESS:
+          throw new IllegalStateException("Can not delete in progress transaction " + String.format("%016x", tid));
+        case UNKNOWN:
+          // nothing to do, it does not exist
+          break;
+      }
+    } finally {
+      store.unreserve(tid, 0);
+    }
+  }
+  
+  public String getReturn(long tid) {
+    store.reserve(tid);
+    try {
+      if (store.getStatus(tid) != TStatus.SUCCESSFUL)
+        throw new IllegalStateException("Tried to get exception when transaction " + String.format("%016x", tid) + " not in successful state");
+      return (String) store.getProperty(tid, RETURN_PROP);
+    } finally {
+      store.unreserve(tid, 0);
+    }
+  }
+  
+  // get reportable failures
+  public Exception getException(long tid) {
+    store.reserve(tid);
+    try {
+      if (store.getStatus(tid) != TStatus.FAILED)
+        throw new IllegalStateException("Tried to get exception when transaction " + String.format("%016x", tid) + " not in failed state");
+      return (Exception) store.getProperty(tid, EXCEPTION_PROP);
+    } finally {
+      store.unreserve(tid, 0);
+    }
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Fate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Repo.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Repo.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Repo.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Repo.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.io.Serializable;
+
+/**
+ * Repeatable persisted operation
+ * 
+ */
+public interface Repo<T> extends Serializable {
+  long isReady(long tid, T environment) throws Exception;
+  
+  Repo<T> call(long tid, T environment) throws Exception;
+  
+  void undo(long tid, T environment) throws Exception;
+  
+  String getDescription();
+  
+  // this allows the last fate op to return something to the user
+  String getReturn();
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/Repo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+public class StackOverflowException extends Exception {
+  
+  public StackOverflowException(String msg) {
+    super(msg);
+  }
+  
+  private static final long serialVersionUID = 1L;
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/StackOverflowException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+/**
+ * Transaction Store: a place to save transactions
+ * 
+ * A transaction consists of a number of operations. To use, first create a transaction id, and then seed the transaction with an initial operation. An executor
+ * service can then execute the transaction's operation, possibly pushing more operations onto the transaction as each step successfully completes. If a step
+ * fails, the stack can be unwound, undoing each operation.
+ */
+public interface TStore<T> {
+  
+  public enum TStatus {
+    /** Unseeded transaction */
+    NEW,
+    /** Transaction is eligible to be executing */
+    IN_PROGRESS,
+    /** Transaction has failed, and is in the process of being rolled back */
+    FAILED_IN_PROGRESS,
+    /** Transaction has failed and has been fully rolled back */
+    FAILED,
+    /** Transaction has succeeded */
+    SUCCESSFUL, UNKNOWN
+  }
+  
+  /**
+   * Create a new transaction id
+   * 
+   * @return a transaction id
+   */
+  public long create();
+  
+  /**
+   * Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS.
+   * 
+   */
+  long reserve();
+  
+  public void reserve(long tid);
+  
+  /**
+   * Return the given transaction to the store
+   * 
+   * @param tid
+   * @param deferTime
+   */
+  void unreserve(long tid, long deferTime);
+  
+  /**
+   * Get the current operation for the given transaction id.
+   * 
+   * @param tid
+   *          transaction id
+   * @return the operation
+   */
+  Repo<T> top(long tid);
+  
+  /**
+   * Update the given transaction with the next operation
+   * 
+   * @param tid
+   *          the transaction id
+   * @param repo
+   *          the operation
+   */
+  public void push(long tid, Repo<T> repo) throws StackOverflowException;
+  
+  /**
+   * Remove the last pushed operation from the given transaction.
+   * 
+   * @param tid
+   */
+  void pop(long tid);
+  
+  /**
+   * Get the state of a given transaction.
+   * 
+   * @param tid
+   *          transaction id
+   * @return execution status
+   */
+  public TStatus getStatus(long tid);
+  
+  /**
+   * Update the state of a given transaction
+   * 
+   * @param tid
+   *          transaction id
+   * @param status
+   *          execution status
+   */
+  public void setStatus(long tid, TStatus status);
+  
+  /**
+   * Wait for the satus of a transaction to change
+   * 
+   * @param tid
+   *          transaction id
+   */
+  public TStatus waitForStatusChange(long tid, EnumSet<TStatus> expected);
+  
+  public void setProperty(long tid, String prop, Serializable val);
+  
+  public Serializable getProperty(long tid, String prop);
+  
+  /**
+   * Remove the transaction from the store.
+   * 
+   * @param tid
+   *          the transaction id
+   */
+  public void delete(long tid);
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+//TODO use zoocache?
+//TODO handle zookeeper being down gracefully
+//TODO document zookeeper layout
+
+public class ZooStore<T> implements TStore<T> {
+  
+  private String path;
+  private IZooReaderWriter zk;
+  private Set<Long> reserved;
+  private Map<Long,Long> defered;
+  private SecureRandom idgenerator;
+  private long statusChangeEvents = 0;
+  private int reservationsWaiting = 0;
+  
+  private byte[] serialize(Object o) {
+    
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(o);
+      oos.close();
+      
+      return baos.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    
+  }
+  
+  private Object deserialize(byte ser[]) {
+    try {
+      ByteArrayInputStream bais = new ByteArrayInputStream(ser);
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      return ois.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  private String getTXPath(long tid) {
+    return String.format("%s/tx_%016x", path, tid);
+  }
+  
+  private long parseTid(String txdir) {
+    return Long.parseLong(txdir.split("_")[1], 16);
+  }
+  
+  public ZooStore(String path, IZooReaderWriter zk) throws KeeperException, InterruptedException {
+    
+    this.path = path;
+    this.zk = zk;
+    this.reserved = new HashSet<Long>();
+    this.defered = new HashMap<Long,Long>();
+    this.idgenerator = new SecureRandom();
+    
+    zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP);
+  }
+  
+  @Override
+  public long create() {
+    while (true) {
+      try {
+        // looking at the code for SecureRandom, it appears to be thread safe
+        long tid = Math.abs(idgenerator.nextLong());
+        zk.putPersistentData(getTXPath(tid), TStatus.NEW.name().getBytes(), NodeExistsPolicy.FAIL);
+        return tid;
+      } catch (NodeExistsException nee) {
+        // exist, so just try another random #
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+  
+  @Override
+  public long reserve() {
+    try {
+      while (true) {
+        
+        long events;
+        synchronized (this) {
+          events = statusChangeEvents;
+        }
+        
+        List<String> txdirs = zk.getChildren(path);
+        
+        for (String txdir : txdirs) {
+          long tid = parseTid(txdir);
+          
+          synchronized (this) {
+            if (defered.containsKey(tid)) {
+              if (defered.get(tid) < System.currentTimeMillis())
+                defered.remove(tid);
+              else
+                continue;
+            }
+            if (!reserved.contains(tid))
+              reserved.add(tid);
+            else
+              continue;
+          }
+          
+          // have reserved id, status should not change
+          
+          try {
+            TStatus status = TStatus.valueOf(new String(zk.getData(path + "/" + txdir, null)));
+            if (status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS) {
+              return tid;
+            } else {
+              unreserve(tid);
+            }
+          } catch (NoNodeException nne) {
+            // node deleted after we got the list of children, its ok
+            unreserve(tid);
+          } catch (Exception e) {
+            unreserve(tid);
+            throw e;
+          }
+        }
+        
+        synchronized (this) {
+          if (events == statusChangeEvents) {
+            if (defered.size() > 0) {
+              Long minTime = Collections.min(defered.values());
+              long waitTime = minTime - System.currentTimeMillis();
+              if (waitTime > 0)
+                this.wait(Math.min(waitTime, 5000));
+            } else
+              this.wait(5000);
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public void reserve(long tid) {
+    synchronized (this) {
+      reservationsWaiting++;
+      try {
+        while (reserved.contains(tid))
+          try {
+            this.wait(1000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        
+        reserved.add(tid);
+      } finally {
+        reservationsWaiting--;
+      }
+    }
+  }
+  
+  private void unreserve(long tid) {
+    synchronized (this) {
+      if (!reserved.remove(tid))
+        throw new IllegalStateException("Tried to unreserve id that was not reserved " + String.format("%016x", tid));
+      
+      // do not want this unreserve to unesc wake up threads in reserve()... this leads to infinite loop when tx is stuck in NEW...
+      // only do this when something external has called reserve(tid)...
+      if (reservationsWaiting > 0)
+        this.notifyAll();
+    }
+  }
+  
+  @Override
+  public void unreserve(long tid, long deferTime) {
+    
+    if (deferTime < 0)
+      throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
+    
+    synchronized (this) {
+      if (!reserved.remove(tid))
+        throw new IllegalStateException("Tried to unreserve id that was not reserved " + String.format("%016x", tid));
+      
+      if (deferTime > 0)
+        defered.put(tid, System.currentTimeMillis() + deferTime);
+      
+      this.notifyAll();
+    }
+    
+  }
+  
+  private void verifyReserved(long tid) {
+    synchronized (this) {
+      if (!reserved.contains(tid))
+        throw new IllegalStateException("Tried to operate on unreserved transaction " + String.format("%016x", tid));
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public Repo<T> top(long tid) {
+    verifyReserved(tid);
+    
+    try {
+      String txpath = getTXPath(tid);
+      String top = findTop(txpath);
+      if (top == null)
+        return null;
+      
+      byte[] ser = zk.getData(txpath + "/" + top, null);
+      return (Repo<T>) deserialize(ser);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  private String findTop(String txpath) throws KeeperException, InterruptedException {
+    List<String> ops = zk.getChildren(txpath);
+    
+    ops = new ArrayList<String>(ops);
+    
+    String max = "";
+    
+    for (String child : ops)
+      if (child.startsWith("repo_") && child.compareTo(max) > 0)
+        max = child;
+    
+    if (max.equals(""))
+      return null;
+    
+    return max;
+  }
+  
+  @Override
+  public void push(long tid, Repo<T> repo) throws StackOverflowException {
+    verifyReserved(tid);
+    
+    String txpath = getTXPath(tid);
+    try {
+      String top = findTop(txpath);
+      if (top != null && Long.parseLong(top.split("_")[1]) > 100) {
+        throw new StackOverflowException("Repo stack size too large");
+      }
+      
+      zk.putPersistentSequential(txpath + "/repo_", serialize(repo));
+    } catch (StackOverflowException soe) {
+      throw soe;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public void pop(long tid) {
+    verifyReserved(tid);
+    
+    try {
+      String txpath = getTXPath(tid);
+      String top = findTop(txpath);
+      if (top == null)
+        throw new IllegalStateException("Tried to pop when empty " + tid);
+      zk.recursiveDelete(txpath + "/" + top, NodeMissingPolicy.SKIP);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  private TStatus _getStatus(long tid) {
+    try {
+      return TStatus.valueOf(new String(zk.getData(getTXPath(tid), null)));
+    } catch (NoNodeException nne) {
+      return TStatus.UNKNOWN;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public TStatus getStatus(long tid) {
+    verifyReserved(tid);
+    return _getStatus(tid);
+  }
+  
+  @Override
+  public TStatus waitForStatusChange(long tid, EnumSet<TStatus> expected) {
+    while (true) {
+      long events;
+      synchronized (this) {
+        events = statusChangeEvents;
+      }
+      
+      TStatus status = _getStatus(tid);
+      if (expected.contains(status))
+        return status;
+      
+      synchronized (this) {
+        if (events == statusChangeEvents) {
+          try {
+            this.wait(5000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+  }
+  
+  @Override
+  public void setStatus(long tid, TStatus status) {
+    verifyReserved(tid);
+    
+    try {
+      zk.putPersistentData(getTXPath(tid), status.name().getBytes(), NodeExistsPolicy.OVERWRITE);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    
+    synchronized (this) {
+      statusChangeEvents++;
+    }
+    
+  }
+  
+  @Override
+  public void delete(long tid) {
+    verifyReserved(tid);
+    
+    try {
+      zk.recursiveDelete(getTXPath(tid), NodeMissingPolicy.SKIP);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  @Override
+  public void setProperty(long tid, String prop, Serializable so) {
+    verifyReserved(tid);
+    
+    try {
+      if (so instanceof String) {
+        zk.putPersistentData(getTXPath(tid) + "/prop_" + prop, ("S " + so).getBytes(), NodeExistsPolicy.OVERWRITE);
+      } else {
+        byte[] sera = serialize(so);
+        byte[] data = new byte[sera.length + 2];
+        System.arraycopy(sera, 0, data, 2, sera.length);
+        data[0] = 'O';
+        data[1] = ' ';
+        zk.putPersistentData(getTXPath(tid) + "/prop_" + prop, data, NodeExistsPolicy.OVERWRITE);
+      }
+    } catch (Exception e2) {
+      throw new RuntimeException(e2);
+    }
+  }
+  
+  @Override
+  public Serializable getProperty(long tid, String prop) {
+    verifyReserved(tid);
+    
+    try {
+      byte[] data = zk.getData(getTXPath(tid) + "/prop_" + prop, null);
+      
+      if (data[0] == 'O') {
+        byte[] sera = new byte[data.length - 2];
+        System.arraycopy(data, 2, sera, 0, sera.length);
+        return (Serializable) deserialize(sera);
+      } else if (data[0] == 'S') {
+        return new String(data, 2, data.length - 2);
+      } else {
+        throw new IllegalStateException("Bad property data " + prop);
+      }
+    } catch (NoNodeException nne) {
+      return null;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public List<Long> list() {
+    try {
+      ArrayList<Long> l = new ArrayList<Long>();
+      List<String> transactions = zk.getChildren(path);
+      for (String txid : transactions) {
+        l.add(parseTid(txid));
+      }
+      return l;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.util;
+
+public class Daemon extends Thread {
+  
+  public Daemon() {
+    setDaemon(true);
+  }
+  
+  public Daemon(Runnable target) {
+    super(target);
+    setDaemon(true);
+  }
+  
+  public Daemon(String name) {
+    super(name);
+    setDaemon(true);
+  }
+  
+  public Daemon(ThreadGroup group, Runnable target) {
+    super(group, target);
+    setDaemon(true);
+  }
+  
+  public Daemon(ThreadGroup group, String name) {
+    super(group, name);
+    setDaemon(true);
+  }
+  
+  public Daemon(Runnable target, String name) {
+    super(target, name);
+    setDaemon(true);
+  }
+  
+  public Daemon(ThreadGroup group, Runnable target, String name) {
+    super(group, target, name);
+    setDaemon(true);
+  }
+  
+  public Daemon(ThreadGroup group, Runnable target, String name, long stackSize) {
+    super(group, target, name, stackSize);
+    setDaemon(true);
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/Daemon.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.util;
+
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+
+public class LoggingRunnable implements Runnable {
+  private Runnable runnable;
+  private Logger log;
+  
+  public LoggingRunnable(Logger log, Runnable r) {
+    this.runnable = r;
+    this.log = log;
+  }
+  
+  public void run() {
+    try {
+      runnable.run();
+    } catch (Throwable t) {
+      try {
+        log.error("Thread \"" + Thread.currentThread().getName() + "\" died " + t.getMessage(), t);
+      } catch (Throwable t2) {
+        // maybe the logging system is screwed up OR there is a bug in the exception, like t.getMessage() throws a NPE
+        System.err.println("ERROR " + new Date() + " Failed to log message about thread death " + t2.getMessage());
+        t2.printStackTrace();
+        
+        // try to print original exception
+        System.err.println("ERROR " + new Date() + " Exception that failed to log : " + t.getMessage());
+        t.printStackTrace();
+      }
+    }
+  }
+  
+  public static void main(String[] args) {
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        int x[] = new int[0];
+        
+        x[0]++;
+      }
+    };
+    
+    LoggingRunnable lr = new LoggingRunnable(null, r);
+    lr.run();
+    
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/LoggingRunnable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.util;
+
+import org.apache.log4j.Logger;
+
+public class UtilWaitThread {
+  private static final Logger log = Logger.getLogger(UtilWaitThread.class);
+  
+  public static void sleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      log.error(e.getMessage(), e);
+    }
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/util/UtilWaitThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.log4j.Logger;
+
+// A ReadWriteLock that can be implemented in ZooKeeper.  Features the ability to store data
+// with the lock, and recover the lock using that data to find the lock.
+public class DistributedReadWriteLock implements java.util.concurrent.locks.ReadWriteLock {
+  
+  static enum LockType {
+    READ, WRITE,
+  };
+  
+  // serializer for lock type and user data
+  static class ParsedLock {
+    public ParsedLock(LockType type, byte[] userData) {
+      this.type = type;
+      this.userData = Arrays.copyOf(userData, userData.length);
+    }
+    
+    public ParsedLock(byte[] lockData) {
+      if (lockData == null || lockData.length < 1)
+        throw new IllegalArgumentException();
+      
+      int split = -1;
+      for (int i = 0; i < lockData.length; i++) {
+        if (lockData[i] == ':') {
+          split = i;
+          break;
+        }
+      }
+      
+      if (split == -1)
+        throw new IllegalArgumentException();
+      
+      this.type = LockType.valueOf(new String(lockData, 0, split));
+      this.userData = Arrays.copyOfRange(lockData, split + 1, lockData.length);
+    }
+    
+    public LockType getType() {
+      return type;
+    }
+    
+    public byte[] getUserData() {
+      return userData;
+    }
+    
+    public byte[] getLockData() {
+      byte typeBytes[] = type.name().getBytes();
+      byte[] result = new byte[userData.length + 1 + typeBytes.length];
+      System.arraycopy(typeBytes, 0, result, 0, typeBytes.length);
+      result[typeBytes.length] = ':';
+      System.arraycopy(userData, 0, result, typeBytes.length + 1, userData.length);
+      return result;
+    }
+    
+    private LockType type;
+    private byte[] userData;
+  }
+  
+  // This kind of lock can be easily implemented by ZooKeeper
+  // You make an entry at the bottom of the queue, readers run when there are no writers ahead of them,
+  // a writer only runs when they are at the top of the queue.
+  public interface QueueLock {
+    SortedMap<Long,byte[]> getEarlierEntries(long entry);
+    
+    void removeEntry(long entry);
+    
+    long addEntry(byte[] data);
+  }
+  
+  public static final Logger log = Logger.getLogger(DistributedReadWriteLock.class);
+  
+  static class ReadLock implements Lock {
+    
+    QueueLock qlock;
+    byte[] userData;
+    long entry = -1;
+    long lastRead = -1;
+    
+    ReadLock(QueueLock qlock, byte[] userData) {
+      this.qlock = qlock;
+      this.userData = userData;
+    }
+    
+    // for recovery
+    ReadLock(QueueLock qlock, byte[] userData, long entry) {
+      this.qlock = qlock;
+      this.userData = userData;
+      this.entry = entry;
+    }
+    
+    protected LockType lockType() {
+      return LockType.READ;
+    }
+    
+    @Override
+    public void lock() {
+      while (true) {
+        try {
+          if (tryLock(1, TimeUnit.DAYS))
+            return;
+        } catch (InterruptedException ex) {
+          // ignored
+        }
+      }
+    }
+    
+    @Override
+    public void lockInterruptibly() throws InterruptedException {
+      while (!Thread.currentThread().isInterrupted()) {
+        if (tryLock(100, TimeUnit.MILLISECONDS))
+          return;
+      }
+    }
+    
+    @Override
+    public boolean tryLock() {
+      if (entry == -1) {
+        entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData());
+        log.info("Added lock entry " + entry + " userData " + new String(this.userData) + " lockType " + lockType());
+      }
+      SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
+      for (Entry<Long,byte[]> entry : entries.entrySet()) {
+        ParsedLock parsed = new ParsedLock(entry.getValue());
+        if (entry.getKey().equals(this.entry))
+          return true;
+        if (parsed.type == LockType.WRITE)
+          return false;
+      }
+      throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + " userData " + new String(this.userData) + " lockType "
+          + lockType());
+    }
+    
+    @Override
+    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+      long now = System.currentTimeMillis();
+      long returnTime = now + TimeUnit.MILLISECONDS.convert(time, unit);
+      while (returnTime > now) {
+        if (tryLock())
+          return true;
+        // TODO: do something better than poll
+        UtilWaitThread.sleep(100);
+        now = System.currentTimeMillis();
+      }
+      return false;
+    }
+    
+    @Override
+    public void unlock() {
+      if (entry == -1)
+        return;
+      log.debug("Removing lock entry " + entry + " userData " + new String(this.userData) + " lockType " + lockType());
+      qlock.removeEntry(entry);
+      entry = -1;
+    }
+    
+    @Override
+    public Condition newCondition() {
+      throw new NotImplementedException();
+    }
+  }
+  
+  static class WriteLock extends ReadLock {
+    
+    WriteLock(QueueLock qlock, byte[] userData) {
+      super(qlock, userData);
+    }
+    
+    WriteLock(QueueLock qlock, byte[] userData, long entry) {
+      super(qlock, userData, entry);
+    }
+    
+    @Override
+    protected LockType lockType() {
+      return LockType.WRITE;
+    }
+    
+    @Override
+    public boolean tryLock() {
+      if (entry == -1) {
+        entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData());
+        log.info("Added lock entry " + entry + " userData " + new String(this.userData) + " lockType " + lockType());
+      }
+      SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
+      Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
+      if (!iterator.hasNext())
+        throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + " userData " + new String(this.userData) + " lockType "
+            + lockType());
+      if (iterator.next().getKey().equals(entry))
+        return true;
+      return false;
+    }
+  }
+  
+  private QueueLock qlock;
+  private byte[] data;
+  
+  public DistributedReadWriteLock(QueueLock qlock, byte[] data) {
+    this.qlock = qlock;
+    this.data = Arrays.copyOf(data, data.length);
+  }
+  
+  static public Lock recoverLock(QueueLock qlock, byte[] data) {
+    SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
+    for (Entry<Long,byte[]> entry : entries.entrySet()) {
+      ParsedLock parsed = new ParsedLock(entry.getValue());
+      if (Arrays.equals(data, parsed.getUserData())) {
+        switch (parsed.getType()) {
+          case READ:
+            return new ReadLock(qlock, parsed.getUserData(), entry.getKey());
+          case WRITE:
+            return new WriteLock(qlock, parsed.getUserData(), entry.getKey());
+        }
+      }
+    }
+    return null;
+  }
+  
+  @Override
+  public Lock readLock() {
+    return new ReadLock(qlock, data);
+  }
+  
+  @Override
+  public Lock writeLock() {
+    return new WriteLock(qlock, data);
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+public interface IZooReader {
+  
+  public abstract byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException;
+  
+  public abstract Stat getStatus(String zPath) throws KeeperException, InterruptedException;
+  
+  public abstract Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
+  
+  public abstract List<String> getChildren(String zPath) throws KeeperException, InterruptedException;
+  
+  public abstract List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
+  
+  public abstract boolean exists(String zPath) throws KeeperException, InterruptedException;
+  
+  public abstract boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message