accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/4] accumulo git commit: ACCUMULO-3423 use zookeeper to track WAL state
Date Thu, 21 May 2015 16:47:07 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master cf9b9a4ea -> 47d1a4db4


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 9fcfec9..9af60dc 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -18,11 +18,8 @@ package org.apache.accumulo.gc.replication;
 
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,9 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 
-import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
@@ -49,7 +44,6 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
@@ -70,8 +64,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.google.common.net.HostAndPort;
 
 public class CloseWriteAheadLogReferencesTest {
@@ -124,130 +116,6 @@ public class CloseWriteAheadLogReferencesTest {
   }
 
   @Test
-  public void findOneWalFromMetadata() throws Exception {
-    Connector conn = createMock(Connector.class);
-    BatchScanner bs = createMock(BatchScanner.class);
-    // Fake out some data
-    final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
-    data.add(entry("tserver1:9997[1234567890]", file));
-
-    // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
-    expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
-    expectLastCall().once();
-    expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>()
{
-
-      @Override
-      public Iterator<Entry<Key,Value>> answer() throws Throwable {
-        return data.iterator();
-      }
-
-    });
-    // Close the bs
-    bs.close();
-    expectLastCall().once();
-
-    replay(conn, bs);
-
-    // Validate
-    Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Collections.singleton(file), wals);
-
-    verify(conn, bs);
-  }
-
-  // This is a silly test now
-  @Test
-  public void findManyRefsToSingleWalFromMetadata() throws Exception {
-    Connector conn = createMock(Connector.class);
-    BatchScanner bs = createMock(BatchScanner.class);
-
-    String uuid = UUID.randomUUID().toString();
-
-    // Fake out some data
-    final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid;
-    data.add(entry("tserver1:9997[0123456789]", filename));
-
-    // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
-    expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
-    expectLastCall().once();
-    expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>()
{
-
-      @Override
-      public Iterator<Entry<Key,Value>> answer() throws Throwable {
-        return data.iterator();
-      }
-
-    });
-    // Close the bs
-    bs.close();
-    expectLastCall().once();
-
-    replay(conn, bs);
-
-    // Validate
-    Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Collections.singleton(filename), wals);
-
-    verify(conn, bs);
-  }
-
-  @Test
-  public void findRefsToManyWalsFromMetadata() throws Exception {
-    Connector conn = createMock(Connector.class);
-    BatchScanner bs = createMock(BatchScanner.class);
-
-    String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
-    String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID();
-    String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID();
-
-    // Fake out some data
-    final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-
-    data.add(entry("tserver1:9997[1234567890]", file1));
-    data.add(entry("tserver2:9997[1234567891]", file2));
-    data.add(entry("tserver3:9997[1234567891]", file3));
-
-    // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
-    expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
-    expectLastCall().once();
-    expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>()
{
-
-      @Override
-      public Iterator<Entry<Key,Value>> answer() throws Throwable {
-        return data.iterator();
-      }
-
-    });
-    // Close the bs
-    bs.close();
-    expectLastCall().once();
-
-    replay(conn, bs);
-
-    // Validate
-    Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Sets.newHashSet(file1, file2, file3), wals);
-
-    verify(conn, bs);
-  }
-
-  private static Entry<Key,Value> entry(String session, String file) {
-    Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF,
new Text(file));
-    return Maps.immutableEntry(key, new Value());
-  }
-
-  @Test
   public void unusedWalsAreClosed() throws Exception {
     Set<String> wals = Collections.emptySet();
     Instance inst = new MockInstance(testName.getMethodName());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 2b874f6..3ff1aa9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -68,6 +68,7 @@ import org.apache.accumulo.master.state.TableStats;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.log.WalMarker;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.ClosableIterator;
@@ -82,6 +83,7 @@ import org.apache.accumulo.server.master.state.TabletStateStore;
 import org.apache.accumulo.server.tables.TableManager;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
@@ -127,6 +129,8 @@ class TabletGroupWatcher extends Daemon {
     int[] oldCounts = new int[TabletState.values().length];
     EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
 
+    WalMarker wals = new WalMarker(master.getInstance(), ZooReaderWriter.getInstance());
+
     while (this.master.stillMaster()) {
       // slow things down a little, otherwise we spam the logs when there are many wake-up
events
       UtilWaitThread.sleep(100);
@@ -242,7 +246,10 @@ class TabletGroupWatcher extends Daemon {
                 assignedToDeadServers.add(tls);
                 if (server.equals(this.master.migrations.get(tls.extent)))
                   this.master.migrations.remove(tls.extent);
-                MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(),
tls.extent, tls.futureOrCurrent(), logsForDeadServers);
+                TServerInstance tserver = tls.futureOrCurrent();
+                if (!logsForDeadServers.containsKey(tserver)) {
+                  logsForDeadServers.put(tserver, wals.getWalsInUse(tserver));
+                }
                 break;
               case UNASSIGNED:
                 // maybe it's a finishing migration
@@ -276,7 +283,9 @@ class TabletGroupWatcher extends Daemon {
                 break;
               case ASSIGNED_TO_DEAD_SERVER:
                 assignedToDeadServers.add(tls);
-                MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(),
tls.extent, tls.futureOrCurrent(), logsForDeadServers);
+                if (!logsForDeadServers.containsKey(tls.futureOrCurrent())) {
+                  logsForDeadServers.put(tls.futureOrCurrent(), wals.getWalsInUse(tls.futureOrCurrent()));
+                }
                 break;
               case HOSTED:
                 TServerConnection conn = this.master.tserverSet.getConnection(server);
@@ -296,7 +305,6 @@ class TabletGroupWatcher extends Daemon {
         }
 
         flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers,
unassigned);
-        store.markLogsAsUnused(master, logsForDeadServers);
 
         // provide stats after flushing changes to avoid race conditions w/ delete table
         stats.end(masterState);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7154732..23a4b34 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -161,6 +161,8 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
@@ -319,6 +321,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
   private final ServerConfigurationFactory confFactory;
 
   private final ZooAuthenticationKeyWatcher authKeyWatcher;
+  private final WalMarker walMarker;
 
   public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) {
     super(confFactory);
@@ -364,6 +367,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
         TabletLocator.clearLocators();
       }
     }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
+    walMarker = new WalMarker(instance, ZooReaderWriter.getInstance());
 
     // Create the secret manager
     setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));
@@ -2381,6 +2385,12 @@ public class TabletServer extends AccumuloServerContext implements
Runnable {
       throw new RuntimeException("Failed to start the tablet client service", e1);
     }
     announceExistence();
+    try {
+      walMarker.initWalMarker(getTabletSession());
+    } catch (Exception e) {
+      log.error("Unable to create WAL marker node in zookeeper", e);
+      throw new RuntimeException(e);
+    }
 
     ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
"distributed work queue");
 
@@ -3020,39 +3030,31 @@ public class TabletServer extends AccumuloServerContext implements
Runnable {
       candidates.removeAll(tablet.getCurrentLogFiles());
     }
     try {
-      Set<Path> filenames = new HashSet<>();
+      TServerInstance session = this.getTabletSession();
       for (DfsLogger candidate : candidates) {
-        filenames.add(candidate.getPath());
+        log.info("Marking " + candidate.getPath() + " as unreferenced");
+        walMarker.walUnreferenced(session, candidate.getPath());
       }
-      MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames);
       synchronized (closedLogs) {
         closedLogs.removeAll(candidates);
       }
-    } catch (AccumuloException ex) {
+    } catch (WalMarkerException ex) {
       log.info(ex.toString(), ex);
     }
   }
 
-  public void addLoggersToMetadata(DfsLogger copy, TabletLevel level) {
-    // serialize the updates to the metadata per level: avoids updating the level more than
once
-    // updating one level, may cause updates to other levels, so we need to release the lock
on metadataTableLogs
-    synchronized (levelLocks[level.ordinal()]) {
-      EnumSet<TabletLevel> set = null;
-      set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
-      if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
-        log.info("Writing log marker for level " + level + " " + copy.getFileName());
-        MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(),
copy.getPath(), level);
-      }
-      set = metadataTableLogs.get(copy);
-      set.add(level);
-    }
+  public void addNewLogMarker(DfsLogger copy) throws WalMarkerException {
+    log.info("Writing log marker for " + copy.getPath());
+    walMarker.addNewWalMarker(getTabletSession(), copy.getPath());
   }
 
-  public void walogClosed(DfsLogger currentLog) {
+  public void walogClosed(DfsLogger currentLog) throws WalMarkerException {
     metadataTableLogs.remove(currentLog);
     synchronized (closedLogs) {
       closedLogs.add(currentLog);
     }
+    log.info("Marking " + currentLog.getPath() + " as closed");
+    walMarker.closeWal(getTabletSession(), currentLog.getPath());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 809c56a..4836d99 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -41,7 +41,7 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.TabletLevel;
+import org.apache.accumulo.fate.util.LoggingRunnable;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.StatusUtil;
@@ -235,7 +235,7 @@ public class TabletServerLogger {
       return;
     }
     nextLogMaker = new SimpleThreadPool(1, "WALog creator");
-    nextLogMaker.submit(new Runnable() {
+    nextLogMaker.submit(new LoggingRunnable(log, new Runnable() {
       @Override
       public void run() {
         final ServerResources conf = tserver.getServerConfig();
@@ -248,6 +248,7 @@ public class TabletServerLogger {
             alog.open(tserver.getClientAddressString());
             String fileName = alog.getFileName();
             log.debug("Created next WAL " + fileName);
+            tserver.addNewLogMarker(alog);
             while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
               log.info("Our WAL was not used for 12 hours: " + fileName);
             }
@@ -280,7 +281,7 @@ public class TabletServerLogger {
           }
         }
       }
-    });
+    }));
   }
 
   public void resetLoggers() throws IOException {
@@ -348,8 +349,6 @@ public class TabletServerLogger {
               try {
                 // Scribble out a tablet definition and then write to the metadata table
                 defineTablet(commitSession);
-                if (currentLogId == logId.get())
-                  tserver.addLoggersToMetadata(copy, TabletLevel.getLevel(commitSession.getExtent()));
               } finally {
                 commitSession.finishUpdatingLogsUsed();
               }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
index 03d783c..b1c010c 100644
--- a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -18,7 +18,9 @@ package org.apache.accumulo.test;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.List;
 import java.util.Map.Entry;
+import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -30,10 +32,12 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -118,13 +122,12 @@ public class UnusedWALIT extends ConfigurableMacIT {
   }
 
   private int getWALCount(Connector c) throws Exception {
-    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(CurrentLogsSection.getRange());
-    try {
-      return Iterators.size(s.iterator());
-    } finally {
-      s.close();
+    WalMarker wals = new WalMarker(c.getInstance(), ZooReaderWriter.getInstance());
+    int result = 0;
+    for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet())
{
+      result += entry.getValue().size();
     }
+    return result;
   }
 
   private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int
startCol, int colCount) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 2b24219..8f4fe75 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -65,7 +65,10 @@ import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -375,8 +378,7 @@ public class VolumeIT extends ConfigurableMacIT {
     bw.close();
   }
 
-  private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws
AccumuloException, AccumuloSecurityException,
-      TableExistsException, TableNotFoundException, MutationsRejectedException {
+  private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws
Exception {
 
     Connector conn = getConnector();
 
@@ -426,19 +428,14 @@ public class VolumeIT extends ConfigurableMacIT {
       Assert.fail("Unexpected volume " + path);
     }
 
-    Text path = new Text();
-    for (String table : new String[] {RootTable.NAME, MetadataTable.NAME}) {
-      Scanner meta = conn.createScanner(table, Authorizations.EMPTY);
-      meta.setRange(MetadataSchema.CurrentLogsSection.getRange());
-      outer: for (Entry<Key,Value> entry : meta) {
-        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-        for (int i = 0; i < paths.length; i++) {
-          if (path.toString().startsWith(paths[i].toString())) {
-            continue outer;
-          }
+    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      for (Path path : paths) {
+        if (entry.getKey().toString().startsWith(path.toString())) {
+          continue outer;
         }
-        Assert.fail("Unexpected volume " + path);
       }
+      Assert.fail("Unexpected volume " + entry.getKey());
     }
 
     // if a volume is chosen randomly for each tablet, then the probability that a volume
will not be chosen for any tablet is ((num_volumes -

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index f2ceb2c..22e2930 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -45,17 +45,18 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.master.state.SetGoalState;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.WatchedEvent;
@@ -199,23 +200,10 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
 
   private Map<String,Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception
{
     Map<String,Boolean> result = new HashMap<>();
-    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
-    root.setRange(CurrentLogsSection.getRange());
-    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
-    meta.setRange(root.getRange());
-    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
-    while (both.hasNext()) {
-      Entry<Key,Value> entry = both.next();
-      Text path = new Text();
-      CurrentLogsSection.getPath(entry.getKey(), path);
-      result.put(path.toString(), entry.getValue().get().length == 0);
-    }
-    String zpath = ZooUtil.getRoot(c.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-    List<String> children = zoo.getChildren(zpath, null);
-    for (String child : children) {
-      byte[] data = zoo.getData(zpath + "/" + child, null, null);
-      LogEntry entry = LogEntry.fromBytes(data);
-      result.put(entry.filename, true);
+    WalMarker wals = new WalMarker(c.getInstance(), ZooReaderWriter.getInstance());
+    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      // WALs are in use if they are not unreferenced
+      result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED);
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 62ed9c2..47873f6 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -47,7 +46,10 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -103,20 +105,14 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
 
     Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 
-    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(CurrentLogsSection.getRange());
-    s.fetchColumnFamily(CurrentLogsSection.COLF);
+    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
 
-    Set<String> wals = new HashSet<String>();
-    for (Entry<Key,Value> entry : s) {
-      log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
-      // hostname:port/uri://path/to/wal
-      String path = new Path(entry.getKey().getColumnQualifier().toString()).toString();
-      log.debug("Extracted file: " + path);
-      wals.add(path);
+    Set<String> result = new HashSet<String>();
+    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      log.debug("Reading WALs: {}={}", entry.getKey(), entry.getValue());
+      result.add(entry.getKey().toString());
     }
-
-    return wals;
+    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 54b42f4..ba68cc2 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.Constants;
@@ -65,7 +66,7 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -74,6 +75,8 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 import org.apache.accumulo.server.replication.StatusCombiner;
@@ -81,6 +84,7 @@ import org.apache.accumulo.server.replication.StatusFormatter;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -131,7 +135,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException
{
+  private Multimap<String,String> getLogs(Connector conn) throws Exception {
     // Map of server to tableId
     Multimap<TServerInstance,String> serverToTableID = HashMultimap.create();
     Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
@@ -144,20 +148,13 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
     // Map of logs to tableId
     Multimap<String,String> logs = HashMultimap.create();
-    scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
-    for (Entry<Key,Value> entry : scanner) {
-      if (Thread.interrupted()) {
-        return logs;
-      }
-      Text path = new Text();
-      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-      Text session = new Text();
-      Text hostPort = new Text();
-      MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort, session);
-      TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString(),
false), session.toString());
-      for (String tableId : serverToTableID.get(server)) {
-        logs.put(new Path(path.toString()).toString(), tableId);
+    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet())
{
+      for (UUID id : entry.getValue()) {
+        Pair<WalState,Path> state = wals.state(entry.getKey(), id);
+        for (String tableId : serverToTableID.get(entry.getKey())) {
+          logs.put(state.getSecond().toString(), tableId);
+        }
       }
     }
     return logs;
@@ -308,16 +305,11 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     Set<String> wals = Sets.newHashSet();
-    Scanner s;
     attempts = 5;
     while (wals.isEmpty() && attempts > 0) {
-      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      s.setRange(MetadataSchema.CurrentLogsSection.getRange());
-      s.fetchColumnFamily(MetadataSchema.CurrentLogsSection.COLF);
-      for (Entry<Key,Value> entry : s) {
-        Text path = new Text();
-        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-        wals.add(new Path(path.toString()).toString());
+      WalMarker markers = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+      for (Entry<Path,WalState> entry : markers.getAllState().entrySet()) {
+        wals.add(entry.getKey().toString());
       }
       attempts--;
     }
@@ -511,8 +503,8 @@ public class ReplicationIT extends ConfigurableMacIT {
         while (keepRunning.get()) {
           try {
             logs.putAll(getLogs(conn));
-          } catch (TableNotFoundException e) {
-            log.error("Metadata table doesn't exist");
+          } catch (Exception e) {
+            log.error("Error getting logs", e);
           }
         }
       }


Mime
View raw message