accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [accumulo] 01/01: Merge branch '1.9' into 2.0
Date Thu, 15 Aug 2019 19:48:14 GMT
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 6ae7ddb99d10635e713742f35fc1db0ce9332ba2
Merge: 95c92ed ab90032
Author: Christopher Tubbs <ctubbsii@apache.org>
AuthorDate: Thu Aug 15 15:45:02 2019 -0400

    Merge branch '1.9' into 2.0

 .../accumulo/gc/GarbageCollectWriteAheadLogs.java      | 10 ++++------
 .../org/apache/accumulo/gc/SimpleGarbageCollector.java | 15 ++++++++++++++-
 .../accumulo/gc/GarbageCollectWriteAheadLogsTest.java  | 18 ++++++++++++++++++
 3 files changed, 36 insertions(+), 7 deletions(-)

diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 289e69d,7afcdc8..e671219
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -85,18 -91,26 +85,15 @@@ public class GarbageCollectWriteAheadLo
     * @param useTrash
     *          true to move files to trash rather than delete them
     */
-   GarbageCollectWriteAheadLogs(final ServerContext context, VolumeManager fs, boolean useTrash)
{
 -  GarbageCollectWriteAheadLogs(final AccumuloServerContext context, final VolumeManager
fs,
 -      final LiveTServerSet liveServers, boolean useTrash) throws IOException {
++  GarbageCollectWriteAheadLogs(final ServerContext context, final VolumeManager fs,
++      final LiveTServerSet liveServers, boolean useTrash) {
      this.context = context;
      this.fs = fs;
      this.useTrash = useTrash;
-     this.liveServers = new LiveTServerSet(context, (current, deleted, added) -> {
-       log.debug("New tablet servers noticed: {}", added);
-       log.debug("Tablet servers removed: {}", deleted);
-     });
-     liveServers.startListeningForTabletServerChanges();
+     this.liveServers = liveServers;
 -
 -    this.walMarker = new WalStateManager(context.getInstance(), ZooReaderWriter.getInstance());
 -    this.store = new Iterable<TabletLocationState>() {
 -      @Override
 -      public Iterator<TabletLocationState> iterator() {
 -        try {
 -          return Iterators.concat(new ZooTabletStateStore().iterator(),
 -              new RootTabletStateStore(context).iterator(),
 -              new MetaDataStateStore(context).iterator());
 -        } catch (DistributedStoreException e) {
 -          throw new RuntimeException(e);
 -        }
 -      }
 -    };
 +    this.walMarker = new WalStateManager(context);
 +    this.store = () -> Iterators.concat(new ZooTabletStateStore(context).iterator(),
 +        new RootTabletStateStore(context).iterator(), new MetaDataStateStore(context).iterator());
    }
  
    /**
@@@ -124,34 -138,31 +121,35 @@@
    }
  
    public void collect(GCStatus status) {
 -
 -    Span span = Trace.start("getCandidates");
      try {
 -      status.currentLog.started = System.currentTimeMillis();
 -
 -      Map<UUID,Path> recoveryLogs = getSortedWALogs();
 -
 -      Map<TServerInstance,Set<UUID>> logsByServer = new HashMap<>();
 -      Map<UUID,Pair<WalState,Path>> logsState = new HashMap<>();
 -      // Scan for log file info first: the order is important
 -      // Consider:
 -      // * get live servers
 -      // * new server gets a lock, creates a log
 -      // * get logs
 -      // * the log appears to belong to a dead server
 -      long count = getCurrent(logsByServer, logsState);
 -      long fileScanStop = System.currentTimeMillis();
 -
 -      log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count,
 -          logsByServer.size(), (fileScanStop - status.currentLog.started) / 1000.));
 -      status.currentLog.candidates = count;
 -      span.stop();
 +      long count;
 +      long fileScanStop;
 +      Map<TServerInstance,Set<UUID>> logsByServer;
 +      Map<UUID,Pair<WalState,Path>> logsState;
 +      Map<UUID,Path> recoveryLogs;
 +      try (TraceScope span = Trace.startSpan("getCandidates")) {
 +        status.currentLog.started = System.currentTimeMillis();
 +
 +        recoveryLogs = getSortedWALogs();
 +
 +        logsByServer = new HashMap<>();
 +        logsState = new HashMap<>();
 +        // Scan for log file info first: the order is important
 +        // Consider:
 +        // * get live servers
 +        // * new server gets a lock, creates a log
 +        // * get logs
 +        // * the log appears to belong to a dead server
 +        count = getCurrent(logsByServer, logsState);
 +        fileScanStop = System.currentTimeMillis();
 +
 +        log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count,
 +            logsByServer.size(), (fileScanStop - status.currentLog.started) / 1000.));
 +        status.currentLog.candidates = count;
 +      }
  
        // now it's safe to get the liveServers
+       liveServers.scanServers();
        Set<TServerInstance> currentServers = liveServers.getCurrentServers();
  
        Map<UUID,TServerInstance> uuidToTServer;
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 147239a,24ce61c..2a0a19f
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -73,13 -82,21 +73,14 @@@ import org.apache.accumulo.fate.zookeep
  import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
  import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
  import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences;
 -import org.apache.accumulo.server.Accumulo;
 -import org.apache.accumulo.server.AccumuloServerContext;
 +import org.apache.accumulo.server.AbstractServer;
  import org.apache.accumulo.server.ServerConstants;
  import org.apache.accumulo.server.ServerOpts;
 -import org.apache.accumulo.server.client.HdfsZooInstance;
 -import org.apache.accumulo.server.conf.ServerConfigurationFactory;
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.fs.VolumeManager.FileType;
 -import org.apache.accumulo.server.fs.VolumeManagerImpl;
  import org.apache.accumulo.server.fs.VolumeUtil;
+ import org.apache.accumulo.server.master.LiveTServerSet;
 -import org.apache.accumulo.server.master.state.TServerInstance;
 -import org.apache.accumulo.server.metrics.MetricsSystemHelper;
  import org.apache.accumulo.server.replication.proto.Replication.Status;
 -import org.apache.accumulo.server.rpc.RpcWrapper;
  import org.apache.accumulo.server.rpc.ServerAddress;
  import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
  import org.apache.accumulo.server.rpc.TServerUtils;
@@@ -487,71 -564,93 +488,83 @@@ public class SimpleGarbageCollector ext
      }
  
      ProbabilitySampler sampler =
 -        new ProbabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT));
 +        TraceUtil.probabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT));
  
+     // This is created outside of the run loop and passed to the walogCollector so that
+     // only a single timed task is created (internal to LiveTServerSet using SimpleTimer.
 -    final LiveTServerSet liveTServerSet = new LiveTServerSet(this, new LiveTServerSet.Listener()
{
 -      @Override
 -      public void update(LiveTServerSet current, Set<TServerInstance> deleted,
 -          Set<TServerInstance> added) {
 -
 -        log.debug("Number of current servers {}, tservers added {}, removed {}",
 -            current == null ? -1 : current.size(), added, deleted);
++    final LiveTServerSet liveTServerSet =
++        new LiveTServerSet(getContext(), (current, deleted, added) -> {
++          log.debug("Number of current servers {}, tservers added {}, removed {}",
++              current == null ? -1 : current.size(), added, deleted);
+ 
 -        if (log.isTraceEnabled()) {
 -          log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current, added, deleted);
 -        }
 -      }
 -    });
++          if (log.isTraceEnabled()) {
++            log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current, added, deleted);
++          }
++        });
+ 
      while (true) {
 -      Trace.on("gc", sampler);
 -
 -      Span gcSpan = Trace.start("loop");
 -      tStart = System.currentTimeMillis();
 -      try {
 -        System.gc(); // make room
 +      try (TraceScope gcOuterSpan = Trace.startSpan("gc", sampler)) {
 +        try (TraceScope gcSpan = Trace.startSpan("loop")) {
 +          tStart = System.currentTimeMillis();
 +          try {
 +            System.gc(); // make room
  
 -        status.current.started = System.currentTimeMillis();
 +            status.current.started = System.currentTimeMillis();
  
 -        new GarbageCollectionAlgorithm().collect(new GCEnv(RootTable.NAME));
 -        new GarbageCollectionAlgorithm().collect(new GCEnv(MetadataTable.NAME));
 +            new GarbageCollectionAlgorithm().collect(new GCEnv(RootTable.NAME));
 +            new GarbageCollectionAlgorithm().collect(new GCEnv(MetadataTable.NAME));
  
 -        log.info("Number of data file candidates for deletion: " + status.current.candidates);
 -        log.info("Number of data file candidates still in use: " + status.current.inUse);
 -        log.info("Number of successfully deleted data files: " + status.current.deleted);
 -        log.info("Number of data files delete failures: " + status.current.errors);
 -
 -        status.current.finished = System.currentTimeMillis();
 -        status.last = status.current;
 -        status.current = new GcCycleStats();
 -
 -      } catch (Exception e) {
 -        log.error("{}", e.getMessage(), e);
 -      }
 +            log.info("Number of data file candidates for deletion: {}", status.current.candidates);
 +            log.info("Number of data file candidates still in use: {}", status.current.inUse);
 +            log.info("Number of successfully deleted data files: {}", status.current.deleted);
 +            log.info("Number of data files delete failures: {}", status.current.errors);
  
 -      tStop = System.currentTimeMillis();
 -      log.info(String.format("Collect cycle took %.2f seconds", ((tStop - tStart) / 1000.0)));
 +            status.current.finished = System.currentTimeMillis();
 +            status.last = status.current;
 +            status.current = new GcCycleStats();
  
 -      // We want to prune references to fully-replicated WALs from the replication table
which are
 -      // no longer referenced in the metadata table
 -      // before running GarbageCollectWriteAheadLogs to ensure we delete as many files as
possible.
 -      Span replSpan = Trace.start("replicationClose");
 -      try {
 -        CloseWriteAheadLogReferences closeWals = new CloseWriteAheadLogReferences(this);
 -        closeWals.run();
 -      } catch (Exception e) {
 -        log.error("Error trying to close write-ahead logs for replication table", e);
 -      } finally {
 -        replSpan.stop();
 -      }
 +          } catch (Exception e) {
 +            log.error("{}", e.getMessage(), e);
 +          }
  
 -      Span waLogs = Trace.start("walogs");
 +          tStop = System.currentTimeMillis();
 +          log.info(String.format("Collect cycle took %.2f seconds", ((tStop - tStart) /
1000.0)));
 +
 +          /*
 +           * We want to prune references to fully-replicated WALs from the replication table
which
 +           * are no longer referenced in the metadata table before running
 +           * GarbageCollectWriteAheadLogs to ensure we delete as many files as possible.
 +           */
 +          try (TraceScope replSpan = Trace.startSpan("replicationClose")) {
 +            CloseWriteAheadLogReferences closeWals = new CloseWriteAheadLogReferences(getContext());
 +            closeWals.run();
 +          } catch (Exception e) {
 +            log.error("Error trying to close write-ahead logs for replication table", e);
 +          }
  
 -      try {
 -        GarbageCollectWriteAheadLogs walogCollector =
 -            new GarbageCollectWriteAheadLogs(this, fs, liveTServerSet, isUsingTrash());
 -        log.info("Beginning garbage collection of write-ahead logs");
 -        walogCollector.collect(status);
 -      } catch (Exception e) {
 -        log.error("{}", e.getMessage(), e);
 -      } finally {
 -        waLogs.stop();
 -      }
 -      gcSpan.stop();
 +          // Clean up any unused write-ahead logs
 +          try (TraceScope waLogs = Trace.startSpan("walogs")) {
 +            GarbageCollectWriteAheadLogs walogCollector =
-                 new GarbageCollectWriteAheadLogs(getContext(), fs, isUsingTrash());
++                new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet, isUsingTrash());
 +            log.info("Beginning garbage collection of write-ahead logs");
 +            walogCollector.collect(status);
 +          } catch (Exception e) {
 +            log.error("{}", e.getMessage(), e);
 +          }
 +        }
  
 -      // we just made a lot of metadata changes: flush them out
 -      try {
 -        Connector connector = getConnector();
 -        connector.tableOperations().compact(MetadataTable.NAME, null, null, true, true);
 -        connector.tableOperations().compact(RootTable.NAME, null, null, true, true);
 -      } catch (Exception e) {
 -        log.warn("{}", e.getMessage(), e);
 +        // we just made a lot of metadata changes: flush them out
 +        try {
 +          AccumuloClient accumuloClient = getContext();
 +          accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true,
true);
 +          accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true);
 +        } catch (Exception e) {
 +          log.warn("{}", e.getMessage(), e);
 +        }
        }
 -
 -      Trace.off();
        try {
          long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
 -        log.debug("Sleeping for " + gcDelay + " milliseconds");
 +        log.debug("Sleeping for {} milliseconds", gcDelay);
          Thread.sleep(gcDelay);
        } catch (InterruptedException e) {
          log.warn("{}", e.getMessage(), e);
diff --cc server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 21dcf42,0c812a6..4d03f9d
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@@ -62,12 -65,13 +62,13 @@@ public class GarbageCollectWriteAheadLo
    private final Collection<Collection<String>> walogs = Collections.emptyList();
    private final TabletLocationState tabletAssignedToServer1;
    private final TabletLocationState tabletAssignedToServer2;
+ 
    {
      try {
 -      tabletAssignedToServer1 = new TabletLocationState(extent, (TServerInstance) null,
server1,
 -          (TServerInstance) null, null, walogs, false);
 -      tabletAssignedToServer2 = new TabletLocationState(extent, (TServerInstance) null,
server2,
 -          (TServerInstance) null, null, walogs, false);
 +      tabletAssignedToServer1 =
 +          new TabletLocationState(extent, null, server1, null, null, walogs, false);
 +      tabletAssignedToServer2 =
 +          new TabletLocationState(extent, null, server2, null, null, walogs, false);
      } catch (Exception ex) {
        throw new RuntimeException(ex);
      }
@@@ -151,11 -164,16 +158,15 @@@
      Scanner rscanner = EasyMock.createMock(Scanner.class);
  
      GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ 
+     tserverSet.scanServers();
+     EasyMock.expectLastCall();
      EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+ 
      EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
      EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN,
path));
 -    EasyMock.expect(context.getConnector()).andReturn(conn);
  
 -    EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
 +    EasyMock.expect(context.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
          .andReturn(rscanner);
      rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
      EasyMock.expectLastCall().once();
@@@ -195,11 -214,16 +206,15 @@@
      Scanner rscanner = EasyMock.createMock(Scanner.class);
  
      GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ 
+     tserverSet.scanServers();
+     EasyMock.expectLastCall();
      EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+ 
      EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
      EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN,
path));
 -    EasyMock.expect(context.getConnector()).andReturn(conn);
  
 -    EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
 +    EasyMock.expect(context.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
          .andReturn(rscanner);
      rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
      EasyMock.expectLastCall().once();
@@@ -240,11 -265,15 +255,14 @@@
  
      GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
  
+     tserverSet.scanServers();
+     EasyMock.expectLastCall();
      EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+ 
      EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
      EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.UNREFERENCED,
path));
 -    EasyMock.expect(context.getConnector()).andReturn(conn);
  
 -    EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
 +    EasyMock.expect(context.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
          .andReturn(rscanner);
      rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
      EasyMock.expectLastCall().once();


Mime
View raw message