accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: Refactor FATE AdminUtil spliting getStatus method. (#891)
Date Thu, 14 Feb 2019 18:13:06 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 739ca1c  Refactor FATE AdminUtil spliting getStatus method. (#891)
739ca1c is described below

commit 739ca1c1023d36819ed9de7155dcf25272e548ad
Author: EdColeman <dev1@etcoleman.com>
AuthorDate: Thu Feb 14 13:13:01 2019 -0500

    Refactor FATE AdminUtil spliting getStatus method. (#891)
    
    Refactor AdminUtil to provide refactor the existing fate status methods into two - one
that provides the existing behaviour and a second
    that does not need to process lock information when that information is not required.
This refactoring is a step to providing FATE metrics
    where a count of existing FATES does not need the lock information.
    * Adds unit test
    * Includes code analysis clean-up.
    
    * AdminUtil_refactor - updated with pull request commnets
    
    This refatoring is to provide a new lighter weigth method to get FATE status
    for metrics. This commit tries to address pull request commnets for review.
    There may be additional clean-up before final commit, but wanted to make sure
    this was on track.
    
    * Updated to address latest comments on pull request.
    
    Modified the utility and the tests to address commnets by keith-turner.
    
    * Corrected log statment that was printing timing info before actual call.
    
    This address comment by Christopher that caught a log statement that
    was inadverntally moved as part of clean-up.
    
    * AdminUtil_refactor - renamed test to clarify the types of IT tests run.
    
    * corrected javadoc formatting
---
 .../java/org/apache/accumulo/fate/AdminUtil.java   | 130 +++++++++++++++++--
 ...leChangeStateIT.java => FateConcurrencyIT.java} | 139 ++++++++++++++++++---
 2 files changed, 240 insertions(+), 29 deletions(-)

diff --git a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
index 1ea9a88..2b37302 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
 public class AdminUtil<T> {
   private static final Logger log = LoggerFactory.getLogger(AdminUtil.class);
 
-  private boolean exitOnError = false;
+  private final boolean exitOnError;
 
   /**
    * Default constructor
@@ -63,6 +63,9 @@ public class AdminUtil<T> {
     this.exitOnError = exitOnError;
   }
 
+  /**
+   * FATE transaction status, including lock information.
+   */
   public static class TransactionStatus {
 
     private final long txid;
@@ -74,12 +77,14 @@ public class AdminUtil<T> {
 
     private TransactionStatus(Long tid, TStatus status, String debug, List<String>
hlocks,
         List<String> wlocks, String top) {
+
       this.txid = tid;
       this.status = status;
       this.debug = debug;
       this.hlocks = Collections.unmodifiableList(hlocks);
       this.wlocks = Collections.unmodifiableList(wlocks);
       this.top = top;
+
     }
 
     /**
@@ -121,7 +126,6 @@ public class AdminUtil<T> {
     public String getTop() {
       return top;
     }
-
   }
 
   public static class FateStatus {
@@ -172,7 +176,7 @@ public class AdminUtil<T> {
     }
 
     /**
-     * Get locks that are waiting to be aquired by non existent FATE transactions. These
are table
+     * Get locks that are waiting to be acquired by non existent FATE transactions. These
are table
      * or namespace locks.
      *
      * @return map where keys are transaction ids and values are a list of table IDs and/or
@@ -184,16 +188,88 @@ public class AdminUtil<T> {
     }
   }
 
+  /**
+   * Returns a list of the FATE transactions, optionally filtered by transaction id and status.
This
+   * method does not process lock information, if lock information is desired, use
+   * {@link #getStatus(ReadOnlyTStore, IZooReader, String, Set, EnumSet)}
+   *
+   * @param zs
+   *          read-only zoostore
+   * @param filterTxid
+   *          filter results to include for provided transaction ids.
+   * @param filterStatus
+   *          filter results to include only provided status types
+   * @return list of FATE transactions that match filter criteria
+   */
+  public List<TransactionStatus> getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long>
filterTxid,
+      EnumSet<TStatus> filterStatus) {
+
+    FateStatus status = getTransactionStatus(zs, filterTxid, filterStatus,
+        Collections.<Long,List<String>> emptyMap(), Collections.<Long,List<String>>
emptyMap());
+
+    return status.getTransactions();
+  }
+
+  /**
+   * Get the FATE transaction status and lock information stored in zookeeper, optionally
filtered
+   * by transaction id and filter status.
+   *
+   * @param zs
+   *          read-only zoostore
+   * @param zk
+   *          zookeeper reader.
+   * @param lockPath
+   *          the zookeeper path for locks
+   * @param filterTxid
+   *          filter results to include for provided transaction ids.
+   * @param filterStatus
+   *          filter results to include only provided status types
+   * @return a summary container of the fate transactions.
+   * @throws KeeperException
+   *           if zookeeper exception occurs
+   * @throws InterruptedException
+   *           if process is interrupted.
+   */
   public FateStatus getStatus(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath,
       Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
       throws KeeperException, InterruptedException {
+
     Map<Long,List<String>> heldLocks = new HashMap<>();
     Map<Long,List<String>> waitingLocks = new HashMap<>();
 
+    findLocks(zk, lockPath, heldLocks, waitingLocks);
+
+    return getTransactionStatus(zs, filterTxid, filterStatus, heldLocks, waitingLocks);
+
+  }
+
+  /**
+   * Walk through the lock nodes in zookeeper to find and populate held locks and waiting
locks.
+   *
+   * @param zk
+   *          zookeeper reader
+   * @param lockPath
+   *          the zookeeper path for locks
+   * @param heldLocks
+   *          map for returning transactions with held locks
+   * @param waitingLocks
+   *          map for returning transactions with waiting locks
+   * @throws KeeperException
+   *           if initial lock list cannot be read.
+   * @throws InterruptedException
+   *           if thread interrupt detected while processing.
+   */
+  private void findLocks(IZooReader zk, final String lockPath,
+      final Map<Long,List<String>> heldLocks, final Map<Long,List<String>>
waitingLocks)
+      throws KeeperException, InterruptedException {
+
+    // stop with exception if lock ids cannot be retrieved from zookeeper
     List<String> lockedIds = zk.getChildren(lockPath);
 
     for (String id : lockedIds) {
+
       try {
+
         List<String> lockNodes = zk.getChildren(lockPath + "/" + id);
         lockNodes = new ArrayList<>(lockNodes);
         Collections.sort(lockNodes);
@@ -204,7 +280,7 @@ public class AdminUtil<T> {
         for (String node : lockNodes) {
           try {
             byte[] data = zk.getData(lockPath + "/" + id + "/" + node, null);
-            String lda[] = new String(data, UTF_8).split(":");
+            String[] lda = new String(data, UTF_8).split(":");
 
             if (lda[0].charAt(0) == 'W')
               sawWriteLock = true;
@@ -235,10 +311,40 @@ public class AdminUtil<T> {
           pos++;
         }
 
-      } catch (Exception e) {
-        log.error("Failed to read locks for " + id + " continuing.", e);
+      } catch (KeeperException ex) {
+        /*
+         * could be transient zk error. Log, but try to process rest of list rather than
throwing
+         * exception here
+         */
+        log.error("Failed to read locks for " + id + " continuing.", ex);
+
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw ex;
       }
     }
+  }
+
+  /**
+   * Returns fate status, possibly filtered
+   *
+   * @param zs
+   *          read-only access to a populated transaction store.
+   * @param filterTxid
+   *          Optional. List of transactions to filter results - if null, all transactions
are
+   *          returned
+   * @param filterStatus
+   *          Optional. List of status types to filter results - if null, all transactions
are
+   *          returned.
+   * @param heldLocks
+   *          populated list of locks held by transaction - or an empty map if none.
+   * @param waitingLocks
+   *          populated list of locks held by transaction - or an empty map if none.
+   * @return current fate and lock status
+   */
+  private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTxid,
+      EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks,
+      Map<Long,List<String>> waitingLocks) {
 
     List<Long> transactions = zs.list();
     List<TransactionStatus> statuses = new ArrayList<>(transactions.size());
@@ -250,20 +356,23 @@ public class AdminUtil<T> {
       String debug = (String) zs.getProperty(tid, "debug");
 
       List<String> hlocks = heldLocks.remove(tid);
-      if (hlocks == null)
+
+      if (hlocks == null) {
         hlocks = Collections.emptyList();
+      }
 
       List<String> wlocks = waitingLocks.remove(tid);
-      if (wlocks == null)
+
+      if (wlocks == null) {
         wlocks = Collections.emptyList();
+      }
 
       String top = null;
       ReadOnlyRepo<T> repo = zs.top(tid);
       if (repo != null)
         top = repo.getDescription();
 
-      TStatus status = null;
-      status = zs.getStatus(tid);
+      TStatus status = zs.getStatus(tid);
 
       zs.unreserve(tid, 0);
 
@@ -275,6 +384,7 @@ public class AdminUtil<T> {
     }
 
     return new FateStatus(statuses, heldLocks, waitingLocks);
+
   }
 
   public void print(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath)
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
similarity index 79%
rename from test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
rename to test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 2d21488..b9829df 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -55,28 +55,53 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * ACCUMULO-4574. Test to verify that changing table state to online / offline
+ * IT Tests that create / run a "slow" FATE transaction so that other operations can be checked.
+ * Included tests for:
+ * <ul>
+ * <li>ACCUMULO-4574. Test to verify that changing table state to online / offline
  * {@link org.apache.accumulo.core.client.admin.TableOperations#online(String)} when the
table is
- * already in that state returns without blocking.
+ * already in that state returns without blocking.</li>
+ * <li>AdminUtil refactor to provide methods that provide FATE status, one with table
lock info
+ * (original) and additional method without.</li>
+ *</ul>
  */
-public class TableChangeStateIT extends AccumuloClusterHarness {
+public class FateConcurrencyIT extends AccumuloClusterHarness {
 
-  private static final Logger log = LoggerFactory.getLogger(TableChangeStateIT.class);
+  private static final Logger log = LoggerFactory.getLogger(FateConcurrencyIT.class);
 
   private static final int NUM_ROWS = 1000;
-  private static final long SLOW_SCAN_SLEEP_MS = 100L;
+  private static final long SLOW_SCAN_SLEEP_MS = 250L;
 
   private Connector connector;
 
+  private static final ExecutorService pool = Executors.newCachedThreadPool();
+
+  private String tableName;
+
+  private String secret;
+
   @Before
   public void setup() {
+
     connector = getConnector();
+
+    tableName = getUniqueNames(1)[0];
+
+    secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
+
+    createData(tableName);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    pool.shutdownNow();
   }
 
   @Override
@@ -97,12 +122,6 @@ public class TableChangeStateIT extends AccumuloClusterHarness {
   @Test
   public void changeTableStateTest() throws Exception {
 
-    ExecutorService pool = Executors.newCachedThreadPool();
-
-    String tableName = getUniqueNames(1)[0];
-
-    createData(tableName);
-
     assertEquals("verify table online after created", TableState.ONLINE, getTableState(tableName));
 
     OnLineCallable onlineOp = new OnLineCallable(tableName);
@@ -135,9 +154,7 @@ public class TableChangeStateIT extends AccumuloClusterHarness {
     // launch a full table compaction with the slow iterator to ensure table lock is acquired
and
     // held by the compaction
 
-    Future<?> compactTask = pool.submit(new SlowCompactionRunner(tableName));
-    assertTrue("verify that compaction running and fate transaction exists",
-        blockUntilCompactionRunning(tableName));
+    Future<?> compactTask = startCompactTask();
 
     // try to set online while fate transaction is in progress - before ACCUMULO-4574 this
would
     // block
@@ -174,6 +191,88 @@ public class TableChangeStateIT extends AccumuloClusterHarness {
   }
 
   /**
+   * Validate the the AdminUtil.getStatus works correctly after refactor and validate that
+   * getTransactionStatus can be called without lock map(s). The test starts a long running
fate
+   * transaction (slow compaction) and the calls AdminUtil functions to get the FATE.
+   *
+   * @throws Exception
+   *           any exception is a test failure
+   */
+  @Test
+  public void getFateStatus() throws Exception {
+
+    assertEquals("verify table online after created", TableState.ONLINE, getTableState(tableName));
+
+    Future<?> compactTask = startCompactTask();
+
+    assertTrue("compaction fate transaction exits", findFate(tableName));
+
+    Instance instance = connector.getInstance();
+    AdminUtil<String> admin = new AdminUtil<>(false);
+
+    try {
+
+      String tableId = Tables.getTableId(instance, tableName);
+
+      log.trace("tid: {}", tableId);
+
+      IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
+          instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
+      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE,
zk);
+
+      AdminUtil.FateStatus withLocks = admin.getStatus(zs, zk,
+          ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
+
+      // call method that does not use locks.
+      List<AdminUtil.TransactionStatus> noLocks = admin.getTransactionStatus(zs, null,
null);
+
+      // fast check - count number of transactions
+      assertEquals(withLocks.getTransactions().size(), noLocks.size());
+
+      int matchCount = 0;
+
+      for (AdminUtil.TransactionStatus tx : withLocks.getTransactions()) {
+
+        log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
+
+        if (tx.getTop().contains("CompactionDriver") && tx.getDebug().contains("CompactRange"))
{
+
+          for (AdminUtil.TransactionStatus tx2 : noLocks) {
+            if (tx2.getTxid().equals(tx.getTxid())) {
+              matchCount++;
+            }
+          }
+        }
+      }
+
+      assertTrue("Number of fates matches should be > 0", matchCount > 0);
+
+    } catch (KeeperException | TableNotFoundException | InterruptedException ex) {
+      throw new IllegalStateException(ex);
+    }
+
+    // test complete, cancel compaction and move on.
+    connector.tableOperations().cancelCompaction(tableName);
+
+    // block if compaction still running
+    compactTask.get();
+
+  }
+
+  /**
+   * Create and run a slow running compaction task. The method will block until the compaction
has
+   * been started.
+   *
+   * @return a reference to the running compaction task.
+   */
+  private Future<?> startCompactTask() {
+    Future<?> compactTask = pool.submit(new SlowCompactionRunner(tableName));
+    assertTrue("verify that compaction running and fate transaction exists",
+        blockUntilCompactionRunning(tableName));
+    return compactTask;
+  }
+
+  /**
    * Blocks current thread until compaction is running.
    *
    * @return true if compaction and associate fate found.
@@ -231,15 +330,18 @@ public class TableChangeStateIT extends AccumuloClusterHarness {
 
       log.trace("tid: {}", tableId);
 
-      String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
       IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
           instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
       ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE,
zk);
       AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk,
           ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
 
+      log.trace("current fates: {}", fateStatus.getTransactions().size());
+
       for (AdminUtil.TransactionStatus tx : fateStatus.getTransactions()) {
 
+        log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
+
         if (tx.getTop().contains("CompactionDriver") && tx.getDebug().contains("CompactRange"))
{
           return true;
         }
@@ -291,8 +393,7 @@ public class TableChangeStateIT extends AccumuloClusterHarness {
       // populate
       for (int i = 0; i < NUM_ROWS; i++) {
         Mutation m = new Mutation(new Text(String.format("%05d", i)));
-        m.put(new Text("col" + Integer.toString((i % 3) + 1)), new Text("qual"),
-            new Value("junk".getBytes(UTF_8)));
+        m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(UTF_8)));
         bw.addMutation(m);
       }
       bw.close();
@@ -323,11 +424,11 @@ public class TableChangeStateIT extends AccumuloClusterHarness {
   }
 
   /**
-   * Provides timing information for oline operation.
+   * Provides timing information for online operation.
    */
   private static class OnlineOpTiming {
 
-    private long started = 0L;
+    private final long started;
     private long completed = 0L;
 
     OnlineOpTiming() {


Mime
View raw message