accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [accumulo] 01/01: Merge branch '1.10' into main
Date Thu, 20 Aug 2020 22:39:40 GMT
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 47d7b708cf14b9d935a889c1082e211075338ed1
Merge: 0c5bc31 dab5b80
Author: Christopher Tubbs <ctubbsii@apache.org>
AuthorDate: Thu Aug 20 18:38:46 2020 -0400

    Merge branch '1.10' into main

 pom.xml                                            | 19 ++----------
 .../replication/CloseWriteAheadLogReferences.java  | 27 ++++++++--------
 .../RemoveCompleteReplicationRecords.java          | 12 ++++----
 .../apache/accumulo/test/functional/ScannerIT.java | 36 ++++++++--------------
 4 files changed, 34 insertions(+), 60 deletions(-)

diff --cc pom.xml
index aec76ba,5f1fae3..224aeca
--- a/pom.xml
+++ b/pom.xml
@@@ -127,24 -127,26 +127,26 @@@
      <failsafe.groups />
      <!-- surefire/failsafe plugin option -->
      <forkCount>1</forkCount>
 -    <!-- version that works when building with Hadoop 2 or 3.0; use 27.0-jre for 3.1
and later -->
 -    <guava.version>14.0.1</guava.version>
 -    <hadoop.version>2.6.5</hadoop.version>
 +    <hadoop.version>3.2.1</hadoop.version>
 +    <hk2.version>2.6.1</hk2.version>
      <htrace.hadoop.version>4.1.0-incubating</htrace.hadoop.version>
 -    <htrace.version>3.1.0-incubating</htrace.version>
 +    <htrace.version>3.2.0-incubating</htrace.version>
      <it.failIfNoSpecifiedTests>false</it.failIfNoSpecifiedTests>
 -    <!-- jetty 9.2 is the last version to support jdk less than 1.8 -->
 -    <jetty.version>9.2.26.v20180806</jetty.version>
 -    <maven.compiler.release>8</maven.compiler.release>
 -    <maven.compiler.source>1.8</maven.compiler.source>
 -    <maven.compiler.target>1.8</maven.compiler.target>
 -    <maven.plugin-version>3.5.0</maven.plugin-version>
 +    <javax.el.version>3.0.1-b06</javax.el.version>
 +    <jaxb.version>2.3.0.1</jaxb.version>
 +    <jersey.version>2.30.1</jersey.version>
 +    <jetty.version>9.4.27.v20200227</jetty.version>
 +    <maven.compiler.release>11</maven.compiler.release>
 +    <maven.compiler.source>11</maven.compiler.source>
 +    <maven.compiler.target>11</maven.compiler.target>
      <!-- surefire/failsafe plugin option -->
      <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
 -    <powermock.version>2.0.2</powermock.version>
 +    <powermock.version>2.0.5</powermock.version>
+     <!-- timestamp for reproducible outputs, updated on release by the release plugin
-->
+     <project.build.outputTimestamp>2020-08-18T00:00:00Z</project.build.outputTimestamp>
      <!-- surefire/failsafe plugin option -->
      <reuseForks>false</reuseForks>
 -    <slf4j.version>1.7.25</slf4j.version>
 +    <slf4j.version>1.7.30</slf4j.version>
      <sourceReleaseAssemblyDescriptor>source-release-tar</sourceReleaseAssemblyDescriptor>
      <surefire.excludedGroups />
      <surefire.failIfNoSpecifiedTests>false</surefire.failIfNoSpecifiedTests>
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 7f5a7c4,5dd25a1..479618f
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@@ -18,8 -16,10 +18,9 @@@
   */
  package org.apache.accumulo.gc.replication;
  
+ import java.time.Duration;
  import java.util.Collections;
  import java.util.HashSet;
 -import java.util.List;
  import java.util.Map.Entry;
  import java.util.Set;
  
@@@ -76,36 -83,49 +76,35 @@@ public class CloseWriteAheadLogReferenc
  
    @Override
    public void run() {
-     // As long as we depend on a newer Guava than Hadoop uses, we have to make sure we're
compatible
-     // with
-     // what the version they bundle uses.
-     Stopwatch sw = Stopwatch.createUnstarted();
 -    // Guava Stopwatch is useful here, for a friendlier toString, but the version of Guava
in Hadoop
 -    // 2 and 3 are different in incompatible ways, so we avoid it here and use Duration
instead, so
 -    // there won't be conflicts with the older Guava that ships by default with Hadoop 2.
++    // Guava Stopwatch is useful here, for a friendlier toString, but the versions of Guava
++    // are different in incompatible ways, so we avoid it here and use Duration instead,
so
++    // there won't be conflicts.
+     long startTime;
+     Duration duration;
  
 -    Connector conn;
 -    try {
 -      conn = context.getConnector();
 -    } catch (Exception e) {
 -      log.error("Could not create connector", e);
 -      throw new RuntimeException(e);
 -    }
 -
 -    if (!ReplicationTable.isOnline(conn)) {
 +    if (!ReplicationTable.isOnline(context)) {
        log.debug("Replication table isn't online, not attempting to clean up wals");
        return;
      }
  
 -    Span findWalsSpan = Trace.start("findReferencedWals");
      HashSet<String> closed = null;
 -    try {
 +    try (TraceScope findWalsSpan = Trace.startSpan("findReferencedWals")) {
-       sw.start();
+       startTime = System.nanoTime();
 -      closed = getClosedLogs(conn);
 +      closed = getClosedLogs();
-     } finally {
-       sw.stop();
+       duration = Duration.ofNanos(System.nanoTime() - startTime);
 -    } finally {
 -      findWalsSpan.stop();
      }
  
-     log.info("Found {} WALs referenced in metadata in {}", closed.size(), sw);
-     sw.reset();
 -    log.info("Found " + closed.size() + " WALs referenced in metadata in " + duration);
++    log.info("Found {} WALs referenced in metadata in {}", closed.size(), duration);
  
 -    Span updateReplicationSpan = Trace.start("updateReplicationTable");
      long recordsClosed = 0;
 -    try {
 +    try (TraceScope updateReplicationSpan = Trace.startSpan("updateReplicationTable")) {
-       sw.start();
+       startTime = System.nanoTime();
 -      recordsClosed = updateReplicationEntries(conn, closed);
 +      recordsClosed = updateReplicationEntries(context, closed);
-     } finally {
-       sw.stop();
+       duration = Duration.ofNanos(System.nanoTime() - startTime);
 -    } finally {
 -      updateReplicationSpan.stop();
      }
  
-     log.info("Closed {} WAL replication references in replication table in {}", recordsClosed,
sw);
 -    log.info("Closed " + recordsClosed + " WAL replication references in replication table
in "
 -        + duration);
++    log.info("Closed {} WAL replication references in replication table in {}", recordsClosed,
++        duration);
    }
  
    /**
diff --cc server/manager/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index 4eeca2c,0000000..35418f5
mode 100644,000000..100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@@ -1,226 -1,0 +1,226 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.master.replication;
 +
 +import java.io.IOException;
++import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.SortedMap;
 +
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
 +import org.apache.hadoop.io.Text;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import com.google.common.base.Stopwatch;
 +import com.google.protobuf.InvalidProtocolBufferException;
 +
 +/**
 + * Delete replication entries from the replication table that are fully replicated and closed
 + */
 +public class RemoveCompleteReplicationRecords implements Runnable {
 +  private static final Logger log = LoggerFactory.getLogger(RemoveCompleteReplicationRecords.class);
 +
 +  private AccumuloClient client;
 +
 +  public RemoveCompleteReplicationRecords(AccumuloClient client) {
 +    this.client = client;
 +  }
 +
 +  @Override
 +  public void run() {
 +    BatchScanner bs;
 +    BatchWriter bw;
 +    try {
 +      bs = ReplicationTable.getBatchScanner(client, 4);
 +      bw = ReplicationTable.getBatchWriter(client);
 +
 +      if (bs == null || bw == null)
 +        throw new AssertionError("Inconceivable; an exception should have been"
 +            + " thrown, but 'bs' or 'bw' was null instead");
 +    } catch (ReplicationTableOfflineException e) {
 +      log.debug("Not attempting to remove complete replication records as the"
 +          + " table ({}) isn't yet online", ReplicationTable.NAME);
 +      return;
 +    }
 +
 +    bs.setRanges(Collections.singleton(new Range()));
 +    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
 +    StatusSection.limit(bs);
 +    WorkSection.limit(bs);
 +    bs.addScanIterator(cfg);
 +
-     Stopwatch sw = Stopwatch.createUnstarted();
 +    long recordsRemoved = 0;
++    long startTime = System.nanoTime();
++    Duration duration;
 +    try {
-       sw.start();
 +      recordsRemoved = removeCompleteRecords(client, bs, bw);
 +    } finally {
 +      bs.close();
 +      try {
 +        bw.close();
 +      } catch (MutationsRejectedException e) {
 +        log.error("Error writing mutations to {}, will retry", ReplicationTable.NAME, e);
 +      }
-       sw.stop();
++      duration = Duration.ofNanos(System.nanoTime() - startTime);
 +    }
 +
-     log.info("Removed {} complete replication entries from the table {}", recordsRemoved,
-         ReplicationTable.NAME);
++    log.info("Removed {} complete replication entries from the table {} in {}", recordsRemoved,
++        ReplicationTable.NAME, duration);
 +  }
 +
 +  /**
 +   * Removes {@link Status} records read from the given {@code bs} and writes a delete,
using the
 +   * given {@code bw}, when that {@link Status} is fully replicated and closed, as defined
by
 +   * {@link StatusUtil#isSafeForRemoval(org.apache.accumulo.server.replication.proto.Replication.Status)}.
 +   *
 +   * @param client
 +   *          Accumulo client
 +   * @param bs
 +   *          A BatchScanner to read replication status records from
 +   * @param bw
 +   *          A BatchWriter to write deletes to
 +   * @return Number of records removed
 +   */
 +  protected long removeCompleteRecords(AccumuloClient client, BatchScanner bs, BatchWriter
bw) {
 +    Text row = new Text(), colf = new Text(), colq = new Text();
 +    long recordsRemoved = 0;
 +
 +    // For each row in the replication table
 +    for (Entry<Key,Value> rowEntry : bs) {
 +      SortedMap<Key,Value> columns;
 +      try {
 +        columns = WholeRowIterator.decodeRow(rowEntry.getKey(), rowEntry.getValue());
 +      } catch (IOException e) {
 +        log.error("Could not deserialize {} with WholeRowIterator", rowEntry.getKey().getRow(),
e);
 +        continue;
 +      }
 +
 +      rowEntry.getKey().getRow(row);
 +
 +      // Try to remove the row (all or nothing)
 +      recordsRemoved += removeRowIfNecessary(bw, columns, row, colf, colq);
 +    }
 +
 +    return recordsRemoved;
 +  }
 +
 +  protected long removeRowIfNecessary(BatchWriter bw, SortedMap<Key,Value> columns,
Text row,
 +      Text colf, Text colq) {
 +    long recordsRemoved = 0;
 +    if (columns.isEmpty()) {
 +      return recordsRemoved;
 +    }
 +
 +    Mutation m = new Mutation(row);
 +    Map<TableId,Long> tableToTimeCreated = new HashMap<>();
 +    for (Entry<Key,Value> entry : columns.entrySet()) {
 +      Status status = null;
 +      try {
 +        status = Status.parseFrom(entry.getValue().get());
 +      } catch (InvalidProtocolBufferException e) {
 +        log.error("Encountered unparsable protobuf for key: {}",
 +            entry.getKey().toStringNoTruncate());
 +        continue;
 +      }
 +
 +      // If a column in the row isn't ready for removal, we keep the whole row
 +      if (!StatusUtil.isSafeForRemoval(status)) {
 +        return 0L;
 +      }
 +
 +      Key k = entry.getKey();
 +      k.getColumnFamily(colf);
 +      k.getColumnQualifier(colq);
 +
 +      log.debug("Removing {} {}:{} from replication table", row, colf, colq);
 +
 +      m.putDelete(colf, colq);
 +
 +      TableId tableId;
 +      if (StatusSection.NAME.equals(colf)) {
 +        tableId = TableId.of(colq.toString());
 +      } else if (WorkSection.NAME.equals(colf)) {
 +        ReplicationTarget target = ReplicationTarget.from(colq);
 +        tableId = target.getSourceTableId();
 +      } else {
 +        throw new RuntimeException("Got unexpected column");
 +      }
 +
 +      if (status.hasCreatedTime()) {
 +        Long timeClosed = tableToTimeCreated.get(tableId);
 +        if (timeClosed == null) {
 +          tableToTimeCreated.put(tableId, status.getCreatedTime());
 +        } else if (timeClosed != status.getCreatedTime()) {
 +          log.warn("Found multiple values for timeClosed for {}: {} and {}", row, timeClosed,
 +              status.getCreatedTime());
 +        }
 +      }
 +
 +      recordsRemoved++;
 +    }
 +
 +    List<Mutation> mutations = new ArrayList<>();
 +    mutations.add(m);
 +    for (Entry<TableId,Long> entry : tableToTimeCreated.entrySet()) {
 +      log.info("Removing order mutation for table {} at {} for {}", entry.getKey(),
 +          entry.getValue(), row);
 +      Mutation orderMutation = OrderSection.createMutation(row.toString(), entry.getValue());
 +      orderMutation.putDelete(OrderSection.NAME, new Text(entry.getKey().canonical()));
 +      mutations.add(orderMutation);
 +    }
 +
 +    // Send the mutation deleting all the columns at once.
 +    // If we send them not as a single Mutation, we run the risk of having some of them
be applied
 +    // which would mean that we might accidentally re-replicate data. We want to get rid
of them all
 +    // at once
 +    // or not at all.
 +    try {
 +      bw.addMutations(mutations);
 +      bw.flush();
 +    } catch (MutationsRejectedException e) {
 +      log.error("Could not submit mutation to remove columns for {} in replication table",
row, e);
 +      return 0L;
 +    }
 +
 +    return recordsRemoved;
 +  }
 +}
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 5b3b2a0,d3ba259..85d8b2b
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@@ -22,11 -20,10 +22,10 @@@ import static org.junit.Assert.assertTr
  
  import java.util.Iterator;
  import java.util.Map.Entry;
- import java.util.concurrent.TimeUnit;
  
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
  import org.apache.accumulo.core.client.BatchWriter;
 -import org.apache.accumulo.core.client.BatchWriterConfig;
 -import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.Scanner;
  import org.apache.accumulo.core.data.Key;
@@@ -38,8 -35,9 +37,6 @@@ import org.apache.accumulo.fate.util.Ut
  import org.apache.accumulo.harness.AccumuloClusterHarness;
  import org.junit.Test;
  
- import com.google.common.base.Stopwatch;
- 
 -/**
 - *
 - */
  public class ScannerIT extends AccumuloClusterHarness {
  
    @Override
@@@ -50,78 -48,68 +47,71 @@@
    @Test
    public void testScannerReadaheadConfiguration() throws Exception {
      final String table = getUniqueNames(1)[0];
 -    Connector c = getConnector();
 -    c.tableOperations().create(table);
 -
 -    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
 -
 -    Mutation m = new Mutation("a");
 -    for (int i = 0; i < 10; i++) {
 -      m.put(Integer.toString(i), "", "");
 +    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
 +      c.tableOperations().create(table);
 +
 +      try (BatchWriter bw = c.createBatchWriter(table)) {
 +        Mutation m = new Mutation("a");
 +        for (int i = 0; i < 10; i++) {
 +          m.put(Integer.toString(i), "", "");
 +        }
 +        bw.addMutation(m);
 +      }
 +
 +      IteratorSetting cfg;
-       Stopwatch sw;
 +      Iterator<Entry<Key,Value>> iterator;
++      long nanosWithWait = 0;
 +      try (Scanner s = c.createScanner(table, new Authorizations())) {
 +
 +        cfg = new IteratorSetting(100, SlowIterator.class);
 +        // A batch size of one will end up calling seek() for each element with no calls
to next()
 +        SlowIterator.setSeekSleepTime(cfg, 100L);
 +
 +        s.addScanIterator(cfg);
 +        // Never start readahead
 +        s.setReadaheadThreshold(Long.MAX_VALUE);
 +        s.setBatchSize(1);
 +        s.setRange(new Range());
 +
-         sw = Stopwatch.createUnstarted();
 +        iterator = s.iterator();
- 
-         sw.start();
++        long startTime = System.nanoTime();
 +        while (iterator.hasNext()) {
-           sw.stop();
++          nanosWithWait += System.nanoTime() - startTime;
 +
 +          // While we "do work" in the client, we should be fetching the next result
 +          UtilWaitThread.sleep(100L);
 +          iterator.next();
-           sw.start();
++          startTime = System.nanoTime();
 +        }
-         sw.stop();
++        nanosWithWait += System.nanoTime() - startTime;
 +      }
 +
-       long millisWithWait = sw.elapsed(TimeUnit.MILLISECONDS);
- 
++      long nanosWithNoWait = 0;
 +      try (Scanner s = c.createScanner(table, new Authorizations())) {
 +        s.addScanIterator(cfg);
 +        s.setRange(new Range());
 +        s.setBatchSize(1);
 +        s.setReadaheadThreshold(0L);
 +
-         sw = Stopwatch.createUnstarted();
 +        iterator = s.iterator();
- 
-         sw.start();
++        long startTime = System.nanoTime();
 +        while (iterator.hasNext()) {
-           sw.stop();
++          nanosWithNoWait += System.nanoTime() - startTime;
 +
 +          // While we "do work" in the client, we should be fetching the next result
 +          UtilWaitThread.sleep(100L);
 +          iterator.next();
-           sw.start();
++          startTime = System.nanoTime();
 +        }
-         sw.stop();
- 
-         long millisWithNoWait = sw.elapsed(TimeUnit.MILLISECONDS);
++        nanosWithNoWait += System.nanoTime() - startTime;
 +
 +        // The "no-wait" time should be much less than the "wait-time"
 +        assertTrue(
-             "Expected less time to be taken with immediate readahead (" + millisWithNoWait
-                 + ") than without immediate readahead (" + millisWithWait + ")",
-             millisWithNoWait < millisWithWait);
++            "Expected less time to be taken with immediate readahead (" + nanosWithNoWait
++                + ") than without immediate readahead (" + nanosWithWait + ")",
++            nanosWithNoWait < nanosWithWait);
 +      }
      }
 -
 -    bw.addMutation(m);
 -    bw.close();
 -
 -    Scanner s = c.createScanner(table, new Authorizations());
 -
 -    IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class);
 -    // A batch size of one will end up calling seek() for each element with no calls to
next()
 -    SlowIterator.setSeekSleepTime(cfg, 100l);
 -
 -    s.addScanIterator(cfg);
 -    // Never start readahead
 -    s.setReadaheadThreshold(Long.MAX_VALUE);
 -    s.setBatchSize(1);
 -    s.setRange(new Range());
 -
 -    Iterator<Entry<Key,Value>> iterator = s.iterator();
 -    long nanosWithWait = 0;
 -    long startTime = System.nanoTime();
 -    while (iterator.hasNext()) {
 -      nanosWithWait += System.nanoTime() - startTime;
 -
 -      // While we "do work" in the client, we should be fetching the next result
 -      UtilWaitThread.sleep(100l);
 -      iterator.next();
 -      startTime = System.nanoTime();
 -    }
 -    nanosWithWait += System.nanoTime() - startTime;
 -
 -    s = c.createScanner(table, new Authorizations());
 -    s.addScanIterator(cfg);
 -    s.setRange(new Range());
 -    s.setBatchSize(1);
 -    s.setReadaheadThreshold(0l);
 -
 -    iterator = s.iterator();
 -    long nanosWithNoWait = 0;
 -    startTime = System.nanoTime();
 -    while (iterator.hasNext()) {
 -      nanosWithNoWait += System.nanoTime() - startTime;
 -
 -      // While we "do work" in the client, we should be fetching the next result
 -      UtilWaitThread.sleep(100l);
 -      iterator.next();
 -      startTime = System.nanoTime();
 -    }
 -    nanosWithNoWait += System.nanoTime() - startTime;
 -
 -    // The "no-wait" time should be much less than the "wait-time"
 -    assertTrue(
 -        "Expected less time to be taken with immediate readahead (" + nanosWithNoWait
 -            + ") than without immediate readahead (" + nanosWithWait + ")",
 -        nanosWithNoWait < nanosWithWait);
    }
  
  }


Mime
View raw message