accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch main updated: Adds Ample support for tablets and uses this in compaction finalizer (#2152)
Date Wed, 09 Jun 2021 21:53:11 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new cd58dbb  Adds Ample support for tablets and uses this in compaction finalizer (#2152)
cd58dbb is described below

commit cd58dbb0198601ffda5e1162a757648861c8ea14
Author: Keith Turner <kturner@apache.org>
AuthorDate: Wed Jun 9 17:53:02 2021 -0400

    Adds Ample support for tablets and uses this in compaction finalizer (#2152)
---
 .../core/metadata/schema/TabletMetadata.java       |  25 -----
 .../core/metadata/schema/TabletsMetadata.java      | 113 ++++++++++++++++++---
 .../accumulo/coordinator/CompactionFinalizer.java  |  26 +++--
 3 files changed, 114 insertions(+), 50 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index 741288d..fc98313 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -39,15 +39,11 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.function.Function;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.RowIterator;
-import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -83,7 +79,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterators;
 
 public class TabletMetadata {
   private static final Logger log = LoggerFactory.getLogger(TabletMetadata.class);
@@ -429,26 +424,6 @@ public class TabletMetadata {
     location = new Location(val, qual, lt);
   }
 
-  static Iterable<TabletMetadata> convert(Scanner input, EnumSet<ColumnType>
fetchedColumns,
-      boolean checkConsistency, boolean buildKeyValueMap) {
-
-    Range range = input.getRange();
-
-    Function<Range,Iterator<TabletMetadata>> iterFactory = r -> {
-      synchronized (input) {
-        input.setRange(r);
-        RowIterator rowIter = new RowIterator(input);
-        return Iterators.transform(rowIter, ri -> convertRow(ri, fetchedColumns, buildKeyValueMap));
-      }
-    };
-
-    if (checkConsistency) {
-      return () -> new LinkingIterator(iterFactory, range);
-    } else {
-      return () -> iterFactory.apply(range);
-    }
-  }
-
   @VisibleForTesting
   static TabletMetadata create(String id, String prevEndRow, String endRow) {
     TabletMetadata te = new TabletMetadata();
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 1d6fc89..042786c 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -18,30 +18,41 @@
  */
 package org.apache.accumulo.core.metadata.schema;
 
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.stream.Collectors.toList;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Function;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -66,6 +77,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
 
 /**
  * An abstraction layer for reading tablet metadata from the accumulo.metadata and accumulo.root
@@ -87,6 +99,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
     private TableId tableId;
     private ReadConsistency readConsistency = ReadConsistency.IMMEDIATE;
     private AccumuloClient _client;
+    private Collection<KeyExtent> extents;
 
     Builder(AccumuloClient client) {
       this._client = client;
@@ -94,7 +107,14 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
 
     @Override
     public TabletsMetadata build() {
-      Preconditions.checkState((level == null) != (table == null),
+      if (extents != null) {
+        // setting multiple extents with forTablets(extents) is mutually exclusive with these
+        // single-tablet options
+        checkState(range == null && table == null && level == DataLevel.USER
&& !checkConsistency);
+        return buildExtents(_client);
+      }
+
+      checkState((level == null) != (table == null),
           "scanTable() cannot be used in conjunction with forLevel(), forTable() or forTablet()");
       if (level == DataLevel.ROOT) {
         ClientContext ctx = ((ClientContext) _client);
@@ -104,6 +124,34 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
       }
     }
 
+    private TabletsMetadata buildExtents(AccumuloClient client) {
+
+      try {
+        BatchScanner scanner = client.createBatchScanner(level.metaTable(), Authorizations.EMPTY);
+
+        var ranges = extents.stream().map(KeyExtent::toMetaRange).collect(toList());
+        scanner.setRanges(ranges);
+
+        configureColumns(scanner);
+        IteratorSetting iterSetting = new IteratorSetting(100, WholeRowIterator.class);
+        scanner.addScanIterator(iterSetting);
+
+        Iterable<TabletMetadata> tmi = () -> Iterators.transform(scanner.iterator(),
entry -> {
+          try {
+            return TabletMetadata.convertRow(
+                WholeRowIterator.decodeRow(entry.getKey(), entry.getValue()).entrySet().iterator(),
+                fetchedCols, saveKeyValues);
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+        return new TabletsMetadata(scanner, tmi);
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     private TabletsMetadata buildNonRoot(AccumuloClient client) {
       try {
 
@@ -117,21 +165,25 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
           fetch(ColumnType.PREV_ROW);
         }
 
-        for (Text fam : families) {
-          scanner.fetchColumnFamily(fam);
-        }
-
-        for (ColumnFQ col : qualifiers) {
-          col.fetch(scanner);
-        }
-
-        if (families.isEmpty() && qualifiers.isEmpty()) {
-          fetchedCols = EnumSet.allOf(ColumnType.class);
+        configureColumns(scanner);
+        Range range1 = scanner.getRange();
+
+        Function<Range,Iterator<TabletMetadata>> iterFactory = r -> {
+          synchronized (scanner) {
+            scanner.setRange(r);
+            RowIterator rowIter = new RowIterator(scanner);
+            return Iterators.transform(rowIter,
+                ri -> TabletMetadata.convertRow(ri, fetchedCols, saveKeyValues));
+          }
+        };
+
+        Iterable<TabletMetadata> tmi;
+        if (checkConsistency) {
+          tmi = () -> new LinkingIterator(iterFactory, range1);
+        } else {
+          tmi = () -> iterFactory.apply(range1);
         }
 
-        Iterable<TabletMetadata> tmi =
-            TabletMetadata.convert(scanner, fetchedCols, checkConsistency, saveKeyValues);
-
         if (endRow != null) {
           // create an iterable that will stop at the tablet which contains the endRow
           return new TabletsMetadata(scanner,
@@ -144,8 +196,17 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
       }
     }
 
+    private void configureColumns(ScannerBase scanner) {
+      families.forEach(scanner::fetchColumnFamily);
+      qualifiers.forEach(col -> col.fetch(scanner));
+      if (families.isEmpty() && qualifiers.isEmpty()) {
+        fetchedCols = EnumSet.allOf(ColumnType.class);
+      }
+    }
+
     @Override
     public Options checkConsistency() {
+      checkState(extents == null, "Unable to check consistency of non-contiguous tablets");
       this.checkConsistency = true;
       return this;
     }
@@ -234,6 +295,19 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
     }
 
     @Override
+    public Options forTablets(Collection<KeyExtent> extents) {
+      if (extents.stream().map(KeyExtent::tableId).map(DataLevel::of)
+          .anyMatch(dl -> dl != DataLevel.USER)) {
+        throw new IllegalArgumentException(
+            "readTablets only supported for user tablets at this time.");
+      }
+
+      this.level = DataLevel.USER;
+      this.extents = extents;
+      return this;
+    }
+
+    @Override
     public Options overRange(Range range) {
       this.range = TabletsSection.getRange().clip(range);
       return this;
@@ -307,6 +381,13 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
     Options forTablet(KeyExtent extent);
 
     /**
+     * Get the tablet metadata for the given extents. This will find tablets based on end
row, so
+     * it's possible the prev rows could differ for the tablets returned. If this matters,
then it
+     * must be checked.
+     */
+    Options forTablets(Collection<KeyExtent> extents);
+
+    /**
      * This method automatically determines where the metadata for the passed in table ID
resides.
      * For example if a user tablet ID is passed in, then the metadata table is scanned.
If the
      * metadata table ID is passed in then the root table is scanned. Defaults to returning
all
@@ -406,7 +487,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
         .convertToTabletMetadata();
   }
 
-  private Scanner scanner;
+  private ScannerBase scanner;
 
   private Iterable<TabletMetadata> tablets;
 
@@ -415,7 +496,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>,
AutoCloseable
     this.tablets = Collections.singleton(tm);
   }
 
-  private TabletsMetadata(Scanner scanner, Iterable<TabletMetadata> tmi) {
+  private TabletsMetadata(ScannerBase scanner, Iterable<TabletMetadata> tmi) {
     this.scanner = scanner;
     this.tablets = tmi;
   }
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 991f98f..c9e8bf0 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -18,6 +18,10 @@
  */
 package org.apache.accumulo.coordinator;
 
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -40,6 +44,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
@@ -148,16 +153,19 @@ public class CompactionFinalizer {
 
         List<ExternalCompactionId> statusesToDelete = new ArrayList<>();
 
+        Map<KeyExtent,TabletMetadata> tabletsMetadata;
+        var extents = batch.stream().map(ExternalCompactionFinalState::getExtent).collect(toList());
+        try (TabletsMetadata tablets = context.getAmple().readTablets().forTablets(extents)
+            .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, ColumnType.ECOMP).build()) {
+          tabletsMetadata = tablets.stream().collect(toMap(TabletMetadata::getExtent, identity()));
+        }
+
         for (ExternalCompactionFinalState ecfs : batch) {
-          // TODO: use #1974 for more efficient metadata reads
-          TabletMetadata tabletMetadata =
-              context.getAmple().readTablets().forTablet(ecfs.getExtent())
-                  .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, ColumnType.ECOMP).build()
-                  .stream().findFirst().orElse(null);
-
-          if (tabletMetadata == null || !tabletMetadata.getExtent().equals(ecfs.getExtent())
-              || !tabletMetadata.getExternalCompactions().keySet()
-                  .contains(ecfs.getExternalCompactionId())) {
+
+          TabletMetadata tabletMetadata = tabletsMetadata.get(ecfs.getExtent());
+
+          if (tabletMetadata == null || !tabletMetadata.getExternalCompactions().keySet()
+              .contains(ecfs.getExternalCompactionId())) {
             // there is not per tablet external compaction entry, so delete its final state
marker
             // from metadata table
             LOG.debug(

Mime
View raw message