accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1426384 - in /accumulo/trunk: fate/src/main/java/org/apache/accumulo/fate/ fate/src/test/java/org/apache/accumulo/fate/ server/src/main/java/org/apache/accumulo/server/fate/ server/src/main/java/org/apache/accumulo/server/master/
Date Fri, 28 Dec 2012 03:36:59 GMT
Author: kturner
Date: Fri Dec 28 03:36:59 2012
New Revision: 1426384

URL: http://svn.apache.org/viewvc?rev=1426384&view=rev
Log:
ACCUMULO-785 added code to age off old finished FATE ops.  Also fixed Fate Admin util.

Added:
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
    accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java
    accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java
Modified:
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java

Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java Fri Dec 28 03:36:59
2012
@@ -129,8 +129,8 @@ public class AdminUtil<T> {
     }
   }
   
-  public void prepDelete(ZooStore<T> zs, String path, String txidStr) {
-    checkGlobalLock(path);
+  public void prepDelete(ZooStore<T> zs, IZooReaderWriter zk, String path, String txidStr)
{
+    checkGlobalLock(zk, path);
     
     long txid = Long.parseLong(txidStr, 16);
     zs.reserve(txid);
@@ -138,8 +138,8 @@ public class AdminUtil<T> {
     zs.unreserve(txid, 0);
   }
   
-  public void prepFail(ZooStore<T> zs, String path, String txidStr) {
-    checkGlobalLock(path);
+  public void prepFail(ZooStore<T> zs, IZooReaderWriter zk, String path, String txidStr)
{
+    checkGlobalLock(zk, path);
     
     long txid = Long.parseLong(txidStr, 16);
     zs.reserve(txid);
@@ -163,9 +163,17 @@ public class AdminUtil<T> {
     }
   }
   
-  public void checkGlobalLock(String path) {
-    if (ZooLock.getLockData(path) != null) {
-      System.err.println("ERROR: Master lock is held, not running");
+  public void checkGlobalLock(IZooReaderWriter zk, String path) {
+    try {
+      if (ZooLock.getLockData(zk.getZooKeeper(), path) != null) {
+        System.err.println("ERROR: Master lock is held, not running");
+        System.exit(-1);
+      }
+    } catch (KeeperException e) {
+      System.err.println("ERROR: Could not read master lock, not running " + e.getMessage());
+      System.exit(-1);
+    } catch (InterruptedException e) {
+      System.err.println("ERROR: Could not read master lock, not running" + e.getMessage());
       System.exit(-1);
     }
   }

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java?rev=1426384&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AgeOffStore.java Fri Dec 28
03:36:59 2012
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This store removes Repos, in the store it wraps, that are in a finished or new state for
more than a configurable time period.    
+ * 
+ * No external time source is used.  It starts tracking idle time when its created.
+ * 
+ */
+public class AgeOffStore<T> implements TStore<T> {
+  
+  public static interface TimeSource {
+    long currentTimeMillis();
+  }
+
+  final private static Logger log = Logger.getLogger(AgeOffStore.class);
+
+  private TStore<T> store;
+  private Map<Long,Long> candidates;
+  private long ageOffTime;
+  private long minTime;
+  private TimeSource timeSource;
+  
+  private synchronized void updateMinTime() {
+    minTime = Long.MAX_VALUE;
+    
+    for (Long time : candidates.values()) {
+      if (time < minTime)
+        minTime = time;
+    }
+  }
+  
+  private synchronized void addCandidate(long txid) {
+    long time = timeSource.currentTimeMillis();
+    candidates.put(txid, time);
+    if (time < minTime)
+      minTime = time;
+  }
+  
+  private synchronized void removeCandidate(long txid) {
+    Long time = candidates.remove(txid);
+    if (time != null && time <= minTime)
+      updateMinTime();
+  }
+  
+  public void ageOff() {
+    HashSet<Long> oldTxs = new HashSet<Long>();
+
+    synchronized (this) {
+      long time = timeSource.currentTimeMillis();
+      if (minTime < time && time - minTime >= ageOffTime) {
+        for (Entry<Long,Long> entry : candidates.entrySet()) {
+          if (time - entry.getValue() >= ageOffTime) {
+            oldTxs.add(entry.getKey());
+          }
+        }
+        
+        candidates.keySet().removeAll(oldTxs);
+        updateMinTime();
+      }
+    }
+    
+    for (Long txid : oldTxs) {
+      try {
+        store.reserve(txid);
+        try {
+          switch (store.getStatus(txid)) {
+            case NEW:
+            case FAILED:
+            case SUCCESSFUL:
+              store.delete(txid);
+              log.debug("Aged off FATE tx " + String.format("%016x", txid));
+              break;
+          }
+          
+        } finally {
+          store.unreserve(txid, 0);
+        }
+      } catch (Exception e) {
+        log.warn("Failed to age off FATE tx " + String.format("%016x", txid), e);
+      }
+    }
+  }
+  
+  public AgeOffStore(TStore<T> store, long ageOffTime, TimeSource timeSource) {
+    this.store = store;
+    this.ageOffTime = ageOffTime;
+    this.timeSource = timeSource;
+    candidates = new HashMap<Long,Long>();
+    
+    minTime = Long.MAX_VALUE;
+
+    List<Long> txids = store.list();
+    for (Long txid : txids) {
+      store.reserve(txid);
+      try {
+        switch (store.getStatus(txid)) {
+          case NEW:
+          case FAILED:
+          case SUCCESSFUL:
+            addCandidate(txid);
+        }
+      } finally {
+        store.unreserve(txid, 0);
+      }
+    }
+  }
+  
+  public AgeOffStore(TStore<T> store, long ageOffTime) {
+    this(store, ageOffTime, new TimeSource() {
+      @Override
+      public long currentTimeMillis() {
+        return System.currentTimeMillis();
+      }
+    });
+  }
+
+  @Override
+  public long create() {
+    long txid = store.create();
+    addCandidate(txid);
+    return txid;
+  }
+  
+  @Override
+  public long reserve() {
+    return store.reserve();
+  }
+  
+  @Override
+  public void reserve(long tid) {
+    store.reserve(tid);
+  }
+  
+  @Override
+  public void unreserve(long tid, long deferTime) {
+    store.unreserve(tid, deferTime);
+  }
+  
+  @Override
+  public Repo<T> top(long tid) {
+    return store.top(tid);
+  }
+  
+  @Override
+  public void push(long tid, Repo<T> repo) throws StackOverflowException {
+    store.push(tid, repo);
+  }
+  
+  @Override
+  public void pop(long tid) {
+    store.pop(tid);
+  }
+  
+  @Override
+  public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) {
+    return store.getStatus(tid);
+  }
+  
+  @Override
+  public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) {
+    store.setStatus(tid, status);
+
+    switch (status) {
+      case IN_PROGRESS:
+      case FAILED_IN_PROGRESS:
+        removeCandidate(tid);
+        break;
+      case FAILED:
+      case SUCCESSFUL:
+        addCandidate(tid);
+        break;
+    }
+  }
+  
+  @Override
+  public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid, EnumSet<org.apache.accumulo.fate.TStore.TStatus>
expected) {
+    return store.waitForStatusChange(tid, expected);
+  }
+  
+  @Override
+  public void setProperty(long tid, String prop, Serializable val) {
+    store.setProperty(tid, prop, val);
+  }
+  
+  @Override
+  public Serializable getProperty(long tid, String prop) {
+    return store.getProperty(tid, prop);
+  }
+  
+  @Override
+  public void delete(long tid) {
+    store.delete(tid);
+    removeCandidate(tid);
+  }
+  
+  @Override
+  public List<Long> list() {
+    return store.list();
+  }
+}

Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/TStore.java Fri Dec 28 03:36:59
2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.fate;
 
 import java.io.Serializable;
 import java.util.EnumSet;
+import java.util.List;
 
 /**
  * Transaction Store: a place to save transactions
@@ -129,4 +130,12 @@ public interface TStore<T> {
    */
   public void delete(long tid);
   
+  /**
+   * list all transaction ids in store
+   * 
+   * @return
+   */
+
+  public List<Long> list();
+
 }

Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java Fri Dec 28 03:36:59
2012
@@ -424,6 +424,7 @@ public class ZooStore<T> implements TSto
     }
   }
   
+  @Override
   public List<Long> list() {
     try {
       ArrayList<Long> l = new ArrayList<Long>();

Added: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java?rev=1426384&view=auto
==============================================================================
--- accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java (added)
+++ accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/AgeOffStoreTest.java Fri Dec
28 03:36:59 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import org.apache.accumulo.fate.AgeOffStore.TimeSource;
+import org.apache.accumulo.fate.TStore.TStatus;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class AgeOffStoreTest {
+  
+  private static class TestTimeSource implements TimeSource {
+    long time = 0;
+    
+    public long currentTimeMillis() {
+      return time;
+    }
+    
+  }
+
+  @Test
+  public void testBasic() {
+
+    TestTimeSource tts = new TestTimeSource();
+    SimpleStore<String> sstore = new SimpleStore<String>();
+    AgeOffStore<String> aoStore = new AgeOffStore<String>(sstore, 10, tts);
+    
+    aoStore.ageOff();
+
+    Long txid1 = aoStore.create();
+    aoStore.reserve(txid1);
+    aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
+    aoStore.unreserve(txid1, 0);
+    
+    aoStore.ageOff();
+
+    Long txid2 = aoStore.create();
+    aoStore.reserve(txid2);
+    aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
+    aoStore.setStatus(txid2, TStatus.FAILED);
+    aoStore.unreserve(txid2, 0);
+    
+    tts.time = 6;
+
+    Long txid3 = aoStore.create();
+    aoStore.reserve(txid3);
+    aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
+    aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
+    aoStore.unreserve(txid3, 0);
+    
+    Long txid4 = aoStore.create();
+    
+    aoStore.ageOff();
+
+    Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)),
new HashSet<Long>(aoStore.list()));
+    Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
+    
+    tts.time = 15;
+    
+    aoStore.ageOff();
+    
+    Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid3, txid4)), new
HashSet<Long>(aoStore.list()));
+    Assert.assertEquals(3, new HashSet<Long>(aoStore.list()).size());
+    
+    tts.time = 30;
+
+    aoStore.ageOff();
+
+    Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
+    Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
+  }
+  
+  @Test
+  public void testNonEmpty() {
+    // test age off when source store starts off non empty
+    
+    TestTimeSource tts = new TestTimeSource();
+    SimpleStore<String> sstore = new SimpleStore<String>();
+    Long txid1 = sstore.create();
+    sstore.reserve(txid1);
+    sstore.setStatus(txid1, TStatus.IN_PROGRESS);
+    sstore.unreserve(txid1, 0);
+    
+    Long txid2 = sstore.create();
+    sstore.reserve(txid2);
+    sstore.setStatus(txid2, TStatus.IN_PROGRESS);
+    sstore.setStatus(txid2, TStatus.FAILED);
+    sstore.unreserve(txid2, 0);
+    
+    Long txid3 = sstore.create();
+    sstore.reserve(txid3);
+    sstore.setStatus(txid3, TStatus.IN_PROGRESS);
+    sstore.setStatus(txid3, TStatus.SUCCESSFUL);
+    sstore.unreserve(txid3, 0);
+    
+    Long txid4 = sstore.create();
+    
+    AgeOffStore<String> aoStore = new AgeOffStore<String>(sstore, 10, tts);
+    
+    Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)),
new HashSet<Long>(aoStore.list()));
+    Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
+    
+    aoStore.ageOff();
+    
+    Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1, txid2, txid3, txid4)),
new HashSet<Long>(aoStore.list()));
+    Assert.assertEquals(4, new HashSet<Long>(aoStore.list()).size());
+    
+    tts.time = 15;
+
+    aoStore.ageOff();
+    
+    Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
+    Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
+    
+    aoStore.reserve(txid1);
+    aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
+    aoStore.unreserve(txid1, 0);
+    
+    tts.time = 30;
+    
+    aoStore.ageOff();
+    
+    Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
+    Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
+    
+    aoStore.reserve(txid1);
+    aoStore.setStatus(txid1, TStatus.FAILED);
+    aoStore.unreserve(txid1, 0);
+    
+    aoStore.ageOff();
+    
+    Assert.assertEquals(new HashSet<Long>(Arrays.asList(txid1)), new HashSet<Long>(aoStore.list()));
+    Assert.assertEquals(1, new HashSet<Long>(aoStore.list()).size());
+    
+    tts.time = 42;
+    
+    aoStore.ageOff();
+
+    Assert.assertEquals(0, new HashSet<Long>(aoStore.list()).size());
+  }
+}

Added: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java?rev=1426384&view=auto
==============================================================================
--- accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java (added)
+++ accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/SimpleStore.java Fri Dec 28
03:36:59 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * 
+ */
+public class SimpleStore<T> implements TStore<T> {
+  
+  private long nextId = 1;
+  private Map<Long,TStatus> statuses = new HashMap<Long,TStore.TStatus>();
+  private Set<Long> reserved = new HashSet<Long>();
+  
+  @Override
+  public long create() {
+    statuses.put(nextId, TStatus.NEW);
+    return nextId++;
+  }
+  
+  @Override
+  public long reserve() {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public void reserve(long tid) {
+    if (reserved.contains(tid))
+      throw new IllegalStateException(); // zoo store would wait, but do not expect test
to reserve twice... if test change, then change this
+    reserved.add(tid);
+  }
+  
+  @Override
+  public void unreserve(long tid, long deferTime) {
+    if (!reserved.remove(tid)) {
+      throw new IllegalStateException();
+    }
+  }
+  
+  @Override
+  public Repo<T> top(long tid) {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public void push(long tid, Repo<T> repo) throws StackOverflowException {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public void pop(long tid) {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) {
+    if (!reserved.contains(tid))
+      throw new IllegalStateException();
+    
+    TStatus status = statuses.get(tid);
+    if (status == null)
+      return TStatus.UNKNOWN;
+    return status;
+  }
+  
+  @Override
+  public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) {
+    if (!reserved.contains(tid))
+      throw new IllegalStateException();
+    if (!statuses.containsKey(tid))
+      throw new IllegalStateException();
+    statuses.put(tid, status);
+  }
+  
+  @Override
+  public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid, EnumSet<org.apache.accumulo.fate.TStore.TStatus>
expected) {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public void setProperty(long tid, String prop, Serializable val) {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public Serializable getProperty(long tid, String prop) {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public void delete(long tid) {
+    if (!reserved.contains(tid))
+      throw new IllegalStateException();
+    statuses.remove(tid);
+  }
+  
+  @Override
+  public List<Long> list() {
+    return new ArrayList<Long>(statuses.keySet());
+  }
+  
+}

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java Fri Dec
28 03:36:59 2012
@@ -20,12 +20,12 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.AdminUtil;
 import org.apache.accumulo.fate.ZooStore;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 
@@ -53,7 +53,7 @@ public class Admin {
   }
   
   @Parameters(commandDescription="List the existing FATE transactions")
-  static class ListOpts {
+  static class PrintOpts {
   }
   
   public static void main(String[] args) throws Exception {
@@ -64,7 +64,7 @@ public class Admin {
     jc.addCommand("fail", fail);
     DeleteOpts deleteOpts = new DeleteOpts();
     jc.addCommand("delete", deleteOpts);
-    jc.addCommand("list", new ListOpts());
+    jc.addCommand("print", new PrintOpts());
     jc.parse(args);
     if (opts.help || jc.getParsedCommand() == null) {
       jc.usage();
@@ -80,9 +80,9 @@ public class Admin {
     ZooStore<Master> zs = new ZooStore<Master>(path, zk);
     
     if (jc.getParsedCommand().equals("fail")) {
-      admin.prepFail(zs, masterPath, args[1]);
+      admin.prepFail(zs, zk, masterPath, args[1]);
     } else if (jc.getParsedCommand().equals("delete")) {
-      admin.prepDelete(zs, masterPath, args[1]);
+      admin.prepDelete(zs, zk, masterPath, args[1]);
       admin.deleteLocks(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, args[1]);
     } else if (jc.getParsedCommand().equals("print")) {
       admin.print(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1426384&r1=1426383&r2=1426384&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Fri
Dec 28 03:36:59 2012
@@ -90,6 +90,7 @@ import org.apache.accumulo.core.util.Cac
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AgeOffStore;
 import org.apache.accumulo.fate.Fate;
 import org.apache.accumulo.fate.TStore.TStatus;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -2153,8 +2154,18 @@ public class Master implements LiveTServ
     
     // TODO: add shutdown for fate object
     try {
-      fate = new Fate<Master>(this, new org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance)
+ Constants.ZFATE,
-          ZooReaderWriter.getRetryingInstance()), 4);
+      final AgeOffStore<Master> store = new AgeOffStore<Master>(new org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance)
+ Constants.ZFATE,
+          ZooReaderWriter.getRetryingInstance()), 1000 * 60 * 60 * 8);
+      
+      fate = new Fate<Master>(this, store, 4);
+      
+      SimpleTimer.getInstance().schedule(new TimerTask() {
+        
+        @Override
+        public void run() {
+          store.ageOff();
+        }
+      }, 63000, 63000);
     } catch (KeeperException e) {
       throw new IOException(e);
     } catch (InterruptedException e) {



Mime
View raw message