From commits-return-21878-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Thu May 24 17:56:00 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 76A26180636 for ; Thu, 24 May 2018 17:55:58 +0200 (CEST) Received: (qmail 68858 invoked by uid 500); 24 May 2018 15:55:57 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 68849 invoked by uid 99); 24 May 2018 15:55:57 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 May 2018 15:55:57 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id AB30782AC1; Thu, 24 May 2018 15:55:56 +0000 (UTC) Date: Thu, 24 May 2018 15:55:56 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch master updated: fixes #469 made bulk import scan split tolerant (#488) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152717735659.1939.3958954079114570426@gitbox.apache.org> From: kturner@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: f1e56aae347d233076142074fc3ec219a395c84d X-Git-Newrev: 9feb5e1dce0082b188a70753f3587bc1ba085c0f X-Git-Rev: 9feb5e1dce0082b188a70753f3587bc1ba085c0f X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new 9feb5e1 fixes #469 made bulk import scan split tolerant (#488) 9feb5e1 is described below commit 9feb5e1dce0082b188a70753f3587bc1ba085c0f Author: Keith Turner AuthorDate: Mon May 14 18:52:25 2018 -0400 fixes #469 made bulk import scan split tolerant (#488) * Made MetadataScanner check structure and retry when linked list broken * Removed TabletIterator * Made code that used TabletIterator use MetadataScanner --- .../core/metadata/schema/LinkingIterator.java | 154 ++++++++++++ .../core/metadata/schema/MetadataScanner.java | 218 ++++++++++++----- .../core/metadata/schema/MetadataSchema.java | 31 ++- .../metadata/schema/TabletDeletedException.java | 27 +++ .../core/metadata/schema/TabletMetadata.java | 194 +++++++++++---- .../org/apache/accumulo/core/summary/Gatherer.java | 12 +- .../core/metadata/schema/LinkingIteratorTest.java | 112 +++++++++ .../accumulo/server/util/MetadataTableUtil.java | 125 +++++----- .../accumulo/server/util/TabletIterator.java | 267 --------------------- .../accumulo/gc/GarbageCollectionAlgorithm.java | 42 ++-- .../accumulo/gc/GarbageCollectionEnvironment.java | 16 +- .../apache/accumulo/gc/SimpleGarbageCollector.java | 34 +-- .../apache/accumulo/gc/GarbageCollectionTest.java | 51 +--- .../master/MasterClientServiceHandler.java | 2 +- .../master/tableOps/bulkVer2/LoadFiles.java | 7 +- .../master/tableOps/bulkVer2/PrepBulkImport.java | 6 +- .../java/org/apache/accumulo/test/CloneIT.java | 6 +- .../apache/accumulo/test/functional/MergeIT.java | 78 ------ 18 files changed, 754 insertions(+), 628 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java new file mode 100644 index 0000000..e95a8af --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/LinkingIterator.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.metadata.schema; + +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; + +import java.util.Iterator; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +/** + * Tablets for a table in the metadata table should form a linked list. This iterator detects when + * tablets do not form a linked list and backs up when this happens. + * + *

+ * The purpose of this is to hide inconsistencies caused by splits and detect anomalies in the + * metadata table. + * + *

+ * If a tablet that was returned by this iterator is subsequently deleted from the metadata table, + * then this iterator will throw a TabletDeletedException. This could occur when a table is merged. + */ +public class LinkingIterator implements Iterator { + + private static final Logger log = LoggerFactory.getLogger(LinkingIterator.class); + + private Range range; + private Function> iteratorFactory; + private Iterator source; + private TabletMetadata prevTablet = null; + + LinkingIterator(Function> iteratorFactory, Range range) { + this.range = range; + this.iteratorFactory = iteratorFactory; + source = iteratorFactory.apply(range); + } + + @Override + public boolean hasNext() { + return source.hasNext(); + } + + static boolean goodTransition(TabletMetadata prev, TabletMetadata curr) { + if (!curr.sawPrevEndRow()) { + log.warn("Tablet {} had no prev end row.", curr.getExtent()); + return false; + } + + if (!curr.getTableId().equals(prev.getTableId())) { + if (prev.getEndRow() != null) { + log.debug("Non-null end row for last tablet in table: " + prev.getExtent() + " " + + curr.getExtent()); + return false; + } + + if (curr.getPrevEndRow() != null) { + log.debug("First tablet for table had prev end row {} {} ", prev.getExtent(), + curr.getExtent()); + return false; + } + } else { + if (prev.getEndRow() == null) { + throw new IllegalStateException("Null end row for tablet in middle of table: " + + prev.getExtent() + " " + curr.getExtent()); + } + + if (curr.getPrevEndRow() == null || !prev.getEndRow().equals(curr.getPrevEndRow())) { + log.debug("Tablets end row and prev end row not equals {} {} ", prev.getExtent(), + curr.getExtent()); + return false; + } + } + + return true; + } + + private void resetSource() { + if (prevTablet == null) { + source = iteratorFactory.apply(range); + } else { + // get the metadata table row for the previous tablet + Text prevMetaRow = KeyExtent.getMetadataEntry(prevTablet.getTableId(), + prevTablet.getEndRow()); + + // ensure the previous tablet still exists in the metadata table + if (Iterators.size(iteratorFactory.apply(new Range(prevMetaRow))) == 0) { + throw new TabletDeletedException("Tablet " + prevMetaRow + " was deleted while iterating"); + } + + // start scanning at next possible row in metadata table + Range seekRange = new Range(new Key(prevMetaRow).followingKey(PartialKey.ROW), true, + range.getEndKey(), range.isEndKeyInclusive()); + + log.info("Resetting scanner to {}", seekRange); + + source = iteratorFactory.apply(seekRange); + } + } + + @Override + public TabletMetadata next() { + + long sleepTime = 250; + + TabletMetadata currTablet = null; + while (currTablet == null) { + TabletMetadata tmp = source.next(); + + if (prevTablet == null) { + if (tmp.sawPrevEndRow()) { + currTablet = tmp; + } else { + log.warn("Tablet has no prev end row " + tmp.getTableId() + " " + tmp.getEndRow()); + } + } else if (goodTransition(prevTablet, tmp)) { + currTablet = tmp; + } + + if (currTablet == null) { + sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); + resetSource(); + sleepTime = Math.min(2 * sleepTime, 5000); + } + } + + prevTablet = currTablet; + return currTablet; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java index 44b7611..6fc05bb 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataScanner.java @@ -17,6 +17,8 @@ package org.apache.accumulo.core.metadata.schema; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; import java.util.ArrayList; @@ -24,58 +26,89 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.client.impl.Table; +import org.apache.accumulo.core.client.impl.Table.ID; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata.FetchedColumns; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.hadoop.io.Text; -import com.google.common.base.Preconditions; - -public class MetadataScanner { +public class MetadataScanner implements Iterable, AutoCloseable { public interface SourceOptions { - TableOptions from(Scanner scanner); - TableOptions from(ClientContext ctx); + + TableOptions from(Connector conn); } - public interface TableOptions { - ColumnOptions overRootTable(); + public interface TableOptions extends RangeOptions { + /** + * Optionally set a table name, defaults to {@value MetadataTable#NAME} + */ + RangeOptions scanTable(String tableName); + } - ColumnOptions overMetadataTable(); + public interface RangeOptions { + Options overTabletRange(); - ColumnOptions overUserTableId(Table.ID tableId); + Options overRange(Range range); - ColumnOptions overUserTableId(Table.ID tableId, Text startRow, Text endRow); + Options overRange(Table.ID tableId); + + Options overRange(Table.ID tableId, Text startRow, Text endRow); } - public interface ColumnOptions { - ColumnOptions fetchFiles(); + public interface Options { + /** + * Checks that the metadata table forms a linked list and automatically backs up until it does. + */ + Options checkConsistency(); + + /** + * Saves the key values seen in the metadata table for each tablet. + */ + Options saveKeyValues(); + + Options fetchFiles(); - ColumnOptions fetchLoaded(); + Options fetchLoaded(); - ColumnOptions fetchLocation(); + Options fetchLocation(); - ColumnOptions fetchPrev(); + Options fetchPrev(); - ColumnOptions fetchLast(); + Options fetchLast(); - Iterable build() + Options fetchScans(); + + Options fetchDir(); + + Options fetchTime(); + + Options fetchCloned(); + + MetadataScanner build() throws TableNotFoundException, AccumuloException, AccumuloSecurityException; } @@ -108,34 +141,41 @@ public class MetadataScanner { } } - private static class Builder implements SourceOptions, TableOptions, ColumnOptions { + private static class Builder implements SourceOptions, TableOptions, RangeOptions, Options { private List families = new ArrayList<>(); private List qualifiers = new ArrayList<>(); - private Scanner scanner; - private ClientContext ctx; - private String table; - private Table.ID userTableId; + private Connector conn; + private String table = MetadataTable.NAME; + private Range range; private EnumSet fetchedCols = EnumSet.noneOf(FetchedColumns.class); - private Text startRow; private Text endRow; + private boolean checkConsistency = false; + private boolean saveKeyValues; @Override - public ColumnOptions fetchFiles() { + public Options fetchFiles() { fetchedCols.add(FetchedColumns.FILES); families.add(DataFileColumnFamily.NAME); return this; } @Override - public ColumnOptions fetchLoaded() { + public Options fetchScans() { + fetchedCols.add(FetchedColumns.SCANS); + families.add(ScanFileColumnFamily.NAME); + return this; + } + + @Override + public Options fetchLoaded() { fetchedCols.add(FetchedColumns.LOADED); - families.add(MetadataSchema.TabletsSection.BulkFileColumnFamily.NAME); + families.add(BulkFileColumnFamily.NAME); return this; } @Override - public ColumnOptions fetchLocation() { + public Options fetchLocation() { fetchedCols.add(FetchedColumns.LOCATION); families.add(CurrentLocationColumnFamily.NAME); families.add(FutureLocationColumnFamily.NAME); @@ -143,31 +183,49 @@ public class MetadataScanner { } @Override - public ColumnOptions fetchPrev() { + public Options fetchPrev() { fetchedCols.add(FetchedColumns.PREV_ROW); qualifiers.add(PREV_ROW_COLUMN); return this; } @Override - public ColumnOptions fetchLast() { + public Options fetchDir() { + fetchedCols.add(FetchedColumns.DIR); + qualifiers.add(DIRECTORY_COLUMN); + return this; + } + + @Override + public Options fetchLast() { fetchedCols.add(FetchedColumns.LAST); families.add(LastLocationColumnFamily.NAME); return this; } @Override - public Iterable build() + public Options fetchTime() { + fetchedCols.add(FetchedColumns.TIME); + qualifiers.add(TIME_COLUMN); + return this; + } + + @Override + public Options fetchCloned() { + fetchedCols.add(FetchedColumns.CLONED); + families.add(ClonedColumnFamily.NAME); + return this; + } + + @Override + public MetadataScanner build() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - if (ctx != null) { - scanner = new IsolatedScanner( - ctx.getConnector().createScanner(table, Authorizations.EMPTY)); - } else if (!(scanner instanceof IsolatedScanner)) { - scanner = new IsolatedScanner(scanner); - } - if (userTableId != null) { - scanner.setRange(new KeyExtent(userTableId, null, startRow).toMetadataRange()); + Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY)); + scanner.setRange(range); + + if (checkConsistency && !fetchedCols.contains(FetchedColumns.PREV_ROW)) { + fetchPrev(); } for (Text fam : families) { @@ -182,69 +240,101 @@ public class MetadataScanner { fetchedCols = EnumSet.allOf(FetchedColumns.class); } - Iterable tmi = TabletMetadata.convert(scanner, fetchedCols); + Iterable tmi = TabletMetadata.convert(scanner, fetchedCols, checkConsistency, + saveKeyValues); if (endRow != null) { // create an iterable that will stop at the tablet which contains the endRow - return new Iterable() { - @Override - public Iterator iterator() { - return new TabletMetadataIterator(tmi.iterator(), endRow); - } - }; + return new MetadataScanner(scanner, + () -> new TabletMetadataIterator(tmi.iterator(), endRow)); } else { - return tmi; + return new MetadataScanner(scanner, tmi); } + } + @Override + public TableOptions from(ClientContext ctx) { + try { + this.conn = ctx.getConnector(); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + return this; } @Override - public ColumnOptions overRootTable() { - this.table = RootTable.NAME; + public TableOptions from(Connector conn) { + this.conn = conn; return this; } @Override - public ColumnOptions overMetadataTable() { - this.table = MetadataTable.NAME; + public Options checkConsistency() { + this.checkConsistency = true; return this; } @Override - public ColumnOptions overUserTableId(Table.ID tableId) { - Preconditions - .checkArgument(!tableId.equals(RootTable.ID) && !tableId.equals(MetadataTable.ID)); + public Options saveKeyValues() { + this.saveKeyValues = true; + return this; + } - this.table = MetadataTable.NAME; - this.userTableId = tableId; + @Override + public Options overTabletRange() { + this.range = TabletsSection.getRange(); return this; } @Override - public TableOptions from(Scanner scanner) { - this.scanner = scanner; + public Options overRange(Range range) { + this.range = range; return this; } @Override - public TableOptions from(ClientContext ctx) { - this.ctx = ctx; + public Options overRange(ID tableId) { + this.range = TabletsSection.getRange(tableId); return this; } @Override - public ColumnOptions overUserTableId(Table.ID tableId, Text startRow, Text endRow) { - this.table = MetadataTable.NAME; - this.userTableId = tableId; - this.startRow = startRow; + public Options overRange(ID tableId, Text startRow, Text endRow) { + this.range = new KeyExtent(tableId, null, startRow).toMetadataRange(); this.endRow = endRow; return this; } + @Override + public RangeOptions scanTable(String tableName) { + this.table = tableName; + return this; + } + } + + private Scanner scanner; + private Iterable tablets; + + private MetadataScanner(Scanner scanner, Iterable tmi) { + this.scanner = scanner; + this.tablets = tmi; } public static SourceOptions builder() { return new Builder(); } + @Override + public Iterator iterator() { + return tablets.iterator(); + } + + public Stream stream() { + return StreamSupport.stream(tablets.spliterator(), false); + } + + @Override + public void close() { + scanner.close(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 5309d6a..b51526a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -75,7 +75,8 @@ public class MetadataSchema { * {@link #PREV_ROW_COLUMN} sits in this and that needs to sort last because the * SimpleGarbageCollector relies on this. */ - public static final Text NAME = new Text("~tab"); + public static final String STR_NAME = "~tab"; + public static final Text NAME = new Text(STR_NAME); /** * README : very important that prevRow sort last to avoid race conditions between garbage * collector and split this needs to sort after everything else for that tablet @@ -95,7 +96,8 @@ public class MetadataSchema { * Column family for recording information used by the TServer */ public static class ServerColumnFamily { - public static final Text NAME = new Text("srv"); + public static final String STR_NAME = "srv"; + public static final Text NAME = new Text(STR_NAME); /** * Holds the location of the tablet in the DFS file system */ @@ -124,27 +126,31 @@ public class MetadataSchema { * that it was assigned */ public static class CurrentLocationColumnFamily { - public static final Text NAME = new Text("loc"); + public static final String STR_NAME = "loc"; + public static final Text NAME = new Text(STR_NAME); } /** * Column family for storing the assigned location */ public static class FutureLocationColumnFamily { - public static final Text NAME = new Text("future"); + public static final String STR_NAME = "future"; + public static final Text NAME = new Text(STR_NAME); } /** * Column family for storing last location, as a hint for assignment */ public static class LastLocationColumnFamily { - public static final Text NAME = new Text("last"); + public static final String STR_NAME = "last"; + public static final Text NAME = new Text(STR_NAME); } /** * Column family for storing suspension location, as a demand for assignment. */ public static class SuspendLocationColumn { + public static final String STR_NAME = "suspend"; public static final ColumnFQ SUSPEND_COLUMN = new ColumnFQ(new Text("suspend"), new Text("loc")); } @@ -153,21 +159,24 @@ public class MetadataSchema { * Temporary markers that indicate a tablet loaded a bulk file */ public static class BulkFileColumnFamily { - public static final Text NAME = new Text("loaded"); + public static final String STR_NAME = "loaded"; + public static final Text NAME = new Text(STR_NAME); } /** * Temporary marker that indicates a tablet was successfully cloned */ public static class ClonedColumnFamily { - public static final Text NAME = new Text("!cloned"); + public static final String STR_NAME = "!cloned"; + public static final Text NAME = new Text(STR_NAME); } /** * Column family for storing files used by a tablet */ public static class DataFileColumnFamily { - public static final Text NAME = new Text("file"); + public static final String STR_NAME = "file"; + public static final Text NAME = new Text(STR_NAME); } /** @@ -175,14 +184,16 @@ public class MetadataSchema { * from being deleted */ public static class ScanFileColumnFamily { - public static final Text NAME = new Text("scan"); + public static final String STR_NAME = "scan"; + public static final Text NAME = new Text(STR_NAME); } /** * Column family for storing write-ahead log entries */ public static class LogColumnFamily { - public static final Text NAME = new Text("log"); + public static final String STR_NAME = "log"; + public static final Text NAME = new Text(STR_NAME); } /** diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java new file mode 100644 index 0000000..d22b1e8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletDeletedException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.metadata.schema; + +public class TabletDeletedException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public TabletDeletedException(String msg) { + super(msg); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index b101971..e3e268f 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -17,6 +17,8 @@ package org.apache.accumulo.core.metadata.schema; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; import java.util.EnumSet; @@ -25,45 +27,61 @@ import java.util.List; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.SortedMap; +import java.util.function.Function; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.util.HostAndPort; import org.apache.hadoop.io.Text; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Iterators; public class TabletMetadata { private Table.ID tableId; private Text prevEndRow; + private boolean sawPrevEndRow = false; private Text endRow; private Location location; private List files; + private List scans; private Set loadedFiles; - private EnumSet fetchedColumns; + private EnumSet fetchedCols; private KeyExtent extent; private Location last; + private String dir; + private String time; + private String cloned; + private SortedMap keyValues; public static enum LocationType { CURRENT, FUTURE, LAST } public static enum FetchedColumns { - LOCATION, PREV_ROW, FILES, LAST, LOADED + LOCATION, PREV_ROW, FILES, LAST, LOADED, SCANS, DIR, TIME, CLONED } public static class Location { @@ -101,47 +119,81 @@ public class TabletMetadata { return extent; } + private void ensureFetched(FetchedColumns col) { + Preconditions.checkState(fetchedCols.contains(col), "%s was not fetched", col); + } + public Text getPrevEndRow() { - Preconditions.checkState(fetchedColumns.contains(FetchedColumns.PREV_ROW), - "Requested prev row when it was not fetched"); + ensureFetched(FetchedColumns.PREV_ROW); return prevEndRow; } + public boolean sawPrevEndRow() { + ensureFetched(FetchedColumns.PREV_ROW); + return sawPrevEndRow; + } + public Text getEndRow() { return endRow; } public Location getLocation() { - Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LOCATION), - "Requested location when it was not fetched"); + ensureFetched(FetchedColumns.LOCATION); return location; } public Set getLoaded() { - Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LOADED), - "Requested loaded when it was not fetched"); + ensureFetched(FetchedColumns.LOADED); return loadedFiles; } public Location getLast() { - Preconditions.checkState(fetchedColumns.contains(FetchedColumns.LAST), - "Requested last when it was not fetched"); + ensureFetched(FetchedColumns.LAST); return last; } public List getFiles() { - Preconditions.checkState(fetchedColumns.contains(FetchedColumns.FILES), - "Requested files when it was not fetched"); + ensureFetched(FetchedColumns.FILES); return files; } - public static TabletMetadata convertRow(Iterator> rowIter, - EnumSet fetchedColumns) { + public List getScans() { + ensureFetched(FetchedColumns.SCANS); + return scans; + } + + public String getDir() { + ensureFetched(FetchedColumns.DIR); + return dir; + } + + public String getTime() { + ensureFetched(FetchedColumns.TIME); + return time; + } + + public String getCloned() { + ensureFetched(FetchedColumns.CLONED); + return cloned; + } + + public SortedMap getKeyValues() { + Preconditions.checkState(keyValues != null, "Requested key values when it was not saved"); + return keyValues; + } + + private static TabletMetadata convertRow(Iterator> rowIter, + EnumSet fetchedColumns, boolean buildKeyValueMap) { Objects.requireNonNull(rowIter); TabletMetadata te = new TabletMetadata(); + ImmutableSortedMap.Builder kvBuilder = null; + if (buildKeyValueMap) { + kvBuilder = ImmutableSortedMap.naturalOrder(); + } Builder filesBuilder = ImmutableList.builder(); + Builder scansBuilder = ImmutableList.builder(); final ImmutableSet.Builder loadedFilesBuilder = ImmutableSet.builder(); ByteSequence row = null; @@ -151,6 +203,10 @@ public class TabletMetadata { Value v = kv.getValue(); Text fam = k.getColumnFamily(); + if (buildKeyValueMap) { + kvBuilder.put(k, v); + } + if (row == null) { row = k.getRowData(); KeyExtent ke = new KeyExtent(k.getRow(), (Text) null); @@ -161,47 +217,95 @@ public class TabletMetadata { "Input contains more than one row : " + row + " " + k.getRowData()); } - if (PREV_ROW_COLUMN.hasColumns(k)) { - te.prevEndRow = KeyExtent.decodePrevEndRow(v); - } - if (fam.equals(DataFileColumnFamily.NAME)) { - filesBuilder.add(k.getColumnQualifier().toString()); - } else if (fam.equals(MetadataSchema.TabletsSection.BulkFileColumnFamily.NAME)) { - loadedFilesBuilder.add(k.getColumnQualifier().toString()); - } else if (fam.equals(CurrentLocationColumnFamily.NAME)) { - if (te.location != null) { - throw new IllegalArgumentException( - "Input contains more than one location " + te.location + " " + v); - } - te.location = new Location(v.toString(), k.getColumnQualifierData().toString(), - LocationType.CURRENT); - } else if (fam.equals(FutureLocationColumnFamily.NAME)) { - if (te.location != null) { - throw new IllegalArgumentException( - "Input contains more than one location " + te.location + " " + v); - } - te.location = new Location(v.toString(), k.getColumnQualifierData().toString(), - LocationType.FUTURE); - } else if (fam.equals(LastLocationColumnFamily.NAME)) { - te.last = new Location(v.toString(), k.getColumnQualifierData().toString(), - LocationType.LAST); + switch (fam.toString()) { + case TabletColumnFamily.STR_NAME: + if (PREV_ROW_COLUMN.hasColumns(k)) { + te.prevEndRow = KeyExtent.decodePrevEndRow(v); + te.sawPrevEndRow = true; + } + break; + case ServerColumnFamily.STR_NAME: + if (DIRECTORY_COLUMN.hasColumns(k)) { + te.dir = v.toString(); + } else if (TIME_COLUMN.hasColumns(k)) { + te.time = v.toString(); + } + break; + case DataFileColumnFamily.STR_NAME: + filesBuilder.add(k.getColumnQualifier().toString()); + break; + case BulkFileColumnFamily.STR_NAME: + loadedFilesBuilder.add(k.getColumnQualifier().toString()); + break; + case CurrentLocationColumnFamily.STR_NAME: + if (te.location != null) { + throw new IllegalArgumentException( + "Input contains more than one location " + te.location + " " + v); + } + te.location = new Location(v.toString(), k.getColumnQualifierData().toString(), + LocationType.CURRENT); + break; + case FutureLocationColumnFamily.STR_NAME: + if (te.location != null) { + throw new IllegalArgumentException( + "Input contains more than one location " + te.location + " " + v); + } + te.location = new Location(v.toString(), k.getColumnQualifierData().toString(), + LocationType.FUTURE); + break; + case LastLocationColumnFamily.STR_NAME: + te.last = new Location(v.toString(), k.getColumnQualifierData().toString(), + LocationType.LAST); + break; + case ScanFileColumnFamily.STR_NAME: + scansBuilder.add(k.getColumnQualifierData().toString()); + break; + case ClonedColumnFamily.STR_NAME: + te.cloned = v.toString(); + break; + default: + throw new IllegalStateException("Unexpected family " + fam); } } te.files = filesBuilder.build(); te.loadedFiles = loadedFilesBuilder.build(); - te.fetchedColumns = fetchedColumns; + te.fetchedCols = fetchedColumns; + te.scans = scansBuilder.build(); + if (buildKeyValueMap) { + te.keyValues = kvBuilder.build(); + } return te; } - public static Iterable convert(Scanner input, - EnumSet fetchedColumns) { - return new Iterable() { - @Override - public Iterator iterator() { + static Iterable convert(Scanner input, EnumSet fetchedColumns, + boolean checkConsistency, boolean buildKeyValueMap) { + + Range range = input.getRange(); + + Function> iterFactory = r -> { + synchronized (input) { + input.setRange(r); RowIterator rowIter = new RowIterator(input); - return Iterators.transform(rowIter, ri -> convertRow(ri, fetchedColumns)); + return Iterators.transform(rowIter, ri -> convertRow(ri, fetchedColumns, buildKeyValueMap)); } }; + + if (checkConsistency) { + return () -> new LinkingIterator(iterFactory, range); + } else { + return () -> iterFactory.apply(range); + } + } + + @VisibleForTesting + static TabletMetadata create(String id, String prevEndRow, String endRow) { + TabletMetadata te = new TabletMetadata(); + te.tableId = Table.ID.of(id); + te.sawPrevEndRow = true; + te.prevEndRow = prevEndRow == null ? null : new Text(prevEndRow); + te.endRow = endRow == null ? null : new Text(endRow); + te.fetchedCols = EnumSet.of(FetchedColumns.PREV_ROW); + return te; } } diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java index 9df6cef..0799510 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java @@ -38,7 +38,6 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -162,11 +161,12 @@ public class Gatherer { throws TableNotFoundException, AccumuloException, AccumuloSecurityException { Iterable tmi = MetadataScanner.builder().from(ctx) - .overUserTableId(tableId, startRow, endRow).fetchFiles().fetchLocation().fetchLast() - .fetchPrev().build(); + .overRange(tableId, startRow, endRow).fetchFiles().fetchLocation().fetchLast().fetchPrev() + .build(); // get a subset of files Map> files = new HashMap<>(); + for (TabletMetadata tm : tmi) { for (String file : tm.getFiles()) { if (fileSelector.test(file)) { @@ -522,10 +522,8 @@ public class Gatherer { private int countFiles() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { // TODO use a batch scanner + iterator to parallelize counting files - Iterable tmi = MetadataScanner.builder().from(ctx) - .overUserTableId(tableId, startRow, endRow).fetchFiles().fetchPrev().build(); - return StreamSupport.stream(tmi.spliterator(), false).mapToInt(tm -> tm.getFiles().size()) - .sum(); + return MetadataScanner.builder().from(ctx).overRange(tableId, startRow, endRow).fetchFiles() + .fetchPrev().build().stream().mapToInt(tm -> tm.getFiles().size()).sum(); } private class GatherRequest implements Supplier { diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/LinkingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/LinkingIteratorTest.java new file mode 100644 index 0000000..a12297b --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/LinkingIteratorTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.metadata.schema; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.create; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class LinkingIteratorTest { + + private static class IterFactory implements Function> { + private int count; + private List initial; + private List subsequent; + + IterFactory(List initial, List subsequent) { + this.initial = initial; + this.subsequent = subsequent; + count = 0; + } + + @Override + public Iterator apply(Range range) { + Stream stream = count++ == 0 ? initial.stream() : subsequent.stream(); + return stream.filter(tm -> range.contains(new Key(tm.getExtent().getMetadataEntry()))) + .iterator(); + } + } + + private static void check(List expected, IterFactory iterFactory) { + List actual = new ArrayList<>(); + new LinkingIterator(iterFactory, new Range()) + .forEachRemaining(tm -> actual.add(tm.getExtent())); + Assert.assertEquals(Lists.transform(expected, TabletMetadata::getExtent), actual); + } + + @Test + public void testHole() { + + List tablets1 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"), + create("4", "r", "x"), create("4", "x", null)); + List tablets2 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"), + create("4", "m", "r"), create("4", "r", "x"), create("4", "x", null)); + + check(tablets2, new IterFactory(tablets1, tablets2)); + } + + @Test(expected = TabletDeletedException.class) + public void testMerge() { + // test for case when a tablet is merged away + List tablets1 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"), + create("4", "f", "r"), create("4", "x", null)); + List tablets2 = Arrays.asList(create("4", null, "f"), create("4", "f", "r"), + create("4", "r", "x"), create("4", "x", null)); + + LinkingIterator li = new LinkingIterator(new IterFactory(tablets1, tablets2), new Range()); + + while (li.hasNext()) { + li.next(); + } + } + + @Test + public void testBadTableTransition1() { + // test when last tablet in table does not have null end row + List tablets1 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"), + create("5", null, null)); + List tablets2 = Arrays.asList(create("4", null, "f"), create("4", "f", "m"), + create("4", "m", null), create("5", null, null)); + + check(tablets2, new IterFactory(tablets1, tablets2)); + } + + @Test + public void testBadTableTransition2() { + // test when first tablet in table does not have null prev end row + List tablets1 = Arrays.asList(create("4", null, "f"), create("4", "f", null), + create("5", "h", null)); + List tablets2 = Arrays.asList(create("4", null, "f"), create("4", "f", null), + create("5", null, "h"), create("5", "h", null)); + + check(tablets2, new IterFactory(tablets1, tablets2)); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index d93de1b..efaf6f0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -62,6 +62,7 @@ import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataScanner; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; @@ -70,6 +71,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletDeletedException; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -714,15 +717,12 @@ public class MetadataTableUtil { } } - private static void getFiles(Set files, Map tablet, Table.ID srcTableId) { - for (Entry entry : tablet.entrySet()) { - if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { - String cf = entry.getKey().getColumnQualifier().toString(); - if (srcTableId != null && !cf.startsWith("../") && !cf.contains(":")) { - cf = "../" + srcTableId + entry.getKey().getColumnQualifier(); - } - files.add(cf); + private static void getFiles(Set files, List tabletFiles, Table.ID srcTableId) { + for (String file : tabletFiles) { + if (srcTableId != null && !file.startsWith("../") && !file.contains(":")) { + file = "../" + srcTableId + file; } + files.add(file); } } @@ -753,37 +753,43 @@ public class MetadataTableUtil { return m; } - private static Scanner createCloneScanner(String tableName, Table.ID tableId, Connector conn) - throws TableNotFoundException { - if (tableId.equals(MetadataTable.ID)) + private static Iterable createCloneScanner(String testTableName, Table.ID tableId, + Connector conn) throws TableNotFoundException { + + String tableName; + Range range; + + if (testTableName != null) { + tableName = testTableName; + range = TabletsSection.getRange(tableId); + } else if (tableId.equals(MetadataTable.ID)) { tableName = RootTable.NAME; - Scanner mscanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - mscanner.fetchColumnFamily(DataFileColumnFamily.NAME); - mscanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); - mscanner.fetchColumnFamily(TabletsSection.LastLocationColumnFamily.NAME); - mscanner.fetchColumnFamily(ClonedColumnFamily.NAME); - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(mscanner); - TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(mscanner); - return mscanner; + range = TabletsSection.getRange(); + } else { + tableName = MetadataTable.NAME; + range = TabletsSection.getRange(tableId); + } + + try { + return MetadataScanner.builder().from(conn).scanTable(tableName).overRange(range) + .checkConsistency().saveKeyValues().fetchFiles().fetchLocation().fetchLast().fetchCloned() + .fetchPrev().fetchTime().build(); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } } @VisibleForTesting - public static void initializeClone(String tableName, Table.ID srcTableId, Table.ID tableId, + public static void initializeClone(String testTableName, Table.ID srcTableId, Table.ID tableId, Connector conn, BatchWriter bw) throws TableNotFoundException, MutationsRejectedException { - TabletIterator ti; - if (srcTableId.equals(MetadataTable.ID)) - ti = new TabletIterator(createCloneScanner(tableName, srcTableId, conn), new Range(), true, - true); - else - ti = new TabletIterator(createCloneScanner(tableName, srcTableId, conn), - new KeyExtent(srcTableId, null, null).toMetadataRange(), true, true); + + Iterator ti = createCloneScanner(testTableName, srcTableId, conn).iterator(); if (!ti.hasNext()) throw new RuntimeException(" table deleted during clone? srcTableId = " + srcTableId); while (ti.hasNext()) - bw.addMutation(createCloneMutation(srcTableId, tableId, ti.next())); + bw.addMutation(createCloneMutation(srcTableId, tableId, ti.next().getKeyValues())); bw.flush(); } @@ -794,12 +800,13 @@ public class MetadataTableUtil { } @VisibleForTesting - public static int checkClone(String tableName, Table.ID srcTableId, Table.ID tableId, + public static int checkClone(String testTableName, Table.ID srcTableId, Table.ID tableId, Connector conn, BatchWriter bw) throws TableNotFoundException, MutationsRejectedException { - TabletIterator srcIter = new TabletIterator(createCloneScanner(tableName, srcTableId, conn), - new KeyExtent(srcTableId, null, null).toMetadataRange(), true, true); - TabletIterator cloneIter = new TabletIterator(createCloneScanner(tableName, tableId, conn), - new KeyExtent(tableId, null, null).toMetadataRange(), true, true); + + Iterator srcIter = createCloneScanner(testTableName, srcTableId, conn) + .iterator(); + Iterator cloneIter = createCloneScanner(testTableName, tableId, conn) + .iterator(); if (!cloneIter.hasNext() || !srcIter.hasNext()) throw new RuntimeException( @@ -808,50 +815,40 @@ public class MetadataTableUtil { int rewrites = 0; while (cloneIter.hasNext()) { - Map cloneTablet = cloneIter.next(); - Text cloneEndRow = new KeyExtent(cloneTablet.keySet().iterator().next().getRow(), (Text) null) - .getEndRow(); + TabletMetadata cloneTablet = cloneIter.next(); + Text cloneEndRow = cloneTablet.getEndRow(); HashSet cloneFiles = new HashSet<>(); - boolean cloneSuccessful = false; - for (Entry entry : cloneTablet.entrySet()) { - if (entry.getKey().getColumnFamily().equals(ClonedColumnFamily.NAME)) { - cloneSuccessful = true; - break; - } - } + boolean cloneSuccessful = cloneTablet.getCloned() != null; if (!cloneSuccessful) - getFiles(cloneFiles, cloneTablet, null); + getFiles(cloneFiles, cloneTablet.getFiles(), null); - List> srcTablets = new ArrayList<>(); - Map srcTablet = srcIter.next(); + List srcTablets = new ArrayList<>(); + TabletMetadata srcTablet = srcIter.next(); srcTablets.add(srcTablet); - Text srcEndRow = new KeyExtent(srcTablet.keySet().iterator().next().getRow(), (Text) null) - .getEndRow(); - + Text srcEndRow = srcTablet.getEndRow(); int cmp = compareEndRows(cloneEndRow, srcEndRow); if (cmp < 0) - throw new TabletIterator.TabletDeletedException( + throw new TabletDeletedException( "Tablets deleted from src during clone : " + cloneEndRow + " " + srcEndRow); HashSet srcFiles = new HashSet<>(); if (!cloneSuccessful) - getFiles(srcFiles, srcTablet, srcTableId); + getFiles(srcFiles, srcTablet.getFiles(), srcTableId); while (cmp > 0) { srcTablet = srcIter.next(); srcTablets.add(srcTablet); - srcEndRow = new KeyExtent(srcTablet.keySet().iterator().next().getRow(), (Text) null) - .getEndRow(); + srcEndRow = srcTablet.getEndRow(); cmp = compareEndRows(cloneEndRow, srcEndRow); if (cmp < 0) - throw new TabletIterator.TabletDeletedException( + throw new TabletDeletedException( "Tablets deleted from src during clone : " + cloneEndRow + " " + srcEndRow); if (!cloneSuccessful) - getFiles(srcFiles, srcTablet, srcTableId); + getFiles(srcFiles, srcTablet.getFiles(), srcTableId); } if (cloneSuccessful) @@ -859,22 +856,22 @@ public class MetadataTableUtil { if (!srcFiles.containsAll(cloneFiles)) { // delete existing cloned tablet entry - Mutation m = new Mutation(cloneTablet.keySet().iterator().next().getRow()); + Mutation m = new Mutation(cloneTablet.getExtent().getMetadataEntry()); - for (Entry entry : cloneTablet.entrySet()) { + for (Entry entry : cloneTablet.getKeyValues().entrySet()) { Key k = entry.getKey(); m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp()); } bw.addMutation(m); - for (Map st : srcTablets) - bw.addMutation(createCloneMutation(srcTableId, tableId, st)); + for (TabletMetadata st : srcTablets) + bw.addMutation(createCloneMutation(srcTableId, tableId, st.getKeyValues())); rewrites++; } else { // write out marker that this tablet was successfully cloned - Mutation m = new Mutation(cloneTablet.keySet().iterator().next().getRow()); + Mutation m = new Mutation(cloneTablet.getExtent().getMetadataEntry()); m.put(ClonedColumnFamily.NAME, new Text(""), new Value("OK".getBytes(UTF_8))); bw.addMutation(m); } @@ -893,13 +890,13 @@ public class MetadataTableUtil { while (true) { try { - initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); + initializeClone(null, srcTableId, tableId, conn, bw); // the following loop looks changes in the file that occurred during the copy.. if files // were dereferenced then they could have been GCed while (true) { - int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); + int rewrites = checkClone(null, srcTableId, tableId, conn, bw); if (rewrites == 0) break; @@ -908,7 +905,7 @@ public class MetadataTableUtil { bw.flush(); break; - } catch (TabletIterator.TabletDeletedException tde) { + } catch (TabletDeletedException tde) { // tablets were merged in the src table bw.flush(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java deleted file mode 100644 index e65fd71..0000000 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.util; - -import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; - -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.impl.Table; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Iterators; - -/** - * This class iterates over the metadata table returning all key values for a tablet in one chunk. - * As it scans the metadata table it checks the correctness of the metadata table, and rescans if - * needed. So the tablet key/values returned by this iterator should satisfy the sorted linked list - * property of the metadata table. - * - * The purpose of this is to hide inconsistencies caused by splits and detect anomalies in the - * metadata table. - * - * If a tablet that was returned by this iterator is subsequently deleted from the metadata table, - * then this iterator will throw a TabletDeletedException. This could occur when a table is merged. - */ -public class TabletIterator implements Iterator> { - - private static final Logger log = LoggerFactory.getLogger(TabletIterator.class); - - private SortedMap currentTabletKeys; - - private Text lastTablet; - - private Scanner scanner; - private Iterator> iter; - - private boolean returnPrevEndRow; - - private boolean returnDir; - - private Range range; - - public static class TabletDeletedException extends RuntimeException { - - private static final long serialVersionUID = 1L; - - public TabletDeletedException(String msg) { - super(msg); - } - } - - /** - * - * @param s - * A scanner over the entire metadata table configure to fetch needed columns. - */ - public TabletIterator(Scanner s, Range range, boolean returnPrevEndRow, boolean returnDir) { - this.scanner = s; - this.range = range; - this.scanner.setRange(range); - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); - this.iter = s.iterator(); - this.returnPrevEndRow = returnPrevEndRow; - this.returnDir = returnDir; - } - - @Override - public boolean hasNext() { - while (currentTabletKeys == null) { - - currentTabletKeys = scanToPrevEndRow(); - if (currentTabletKeys.size() == 0) { - break; - } - - Key prevEndRowKey = currentTabletKeys.lastKey(); - Value prevEndRowValue = currentTabletKeys.get(prevEndRowKey); - - if (!TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(prevEndRowKey)) { - log.debug("{}", currentTabletKeys); - throw new RuntimeException("Unexpected key " + prevEndRowKey); - } - - Text per = KeyExtent.decodePrevEndRow(prevEndRowValue); - Text lastEndRow; - - if (lastTablet == null) { - lastEndRow = null; - } else { - lastEndRow = new KeyExtent(lastTablet, (Text) null).getEndRow(); - - // do table transition sanity check - Table.ID lastTable = new KeyExtent(lastTablet, (Text) null).getTableId(); - Table.ID currentTable = new KeyExtent(prevEndRowKey.getRow(), (Text) null).getTableId(); - - if (!lastTable.equals(currentTable) && (per != null || lastEndRow != null)) { - log.info("Metadata inconsistency on table transition : {} {} {} {}", lastTable, - currentTable, per, lastEndRow); - - currentTabletKeys = null; - resetScanner(); - - sleepUninterruptibly(250, TimeUnit.MILLISECONDS); - - continue; - } - } - - boolean perEqual = (per == null && lastEndRow == null) - || (per != null && lastEndRow != null && per.equals(lastEndRow)); - - if (!perEqual) { - - log.info("Metadata inconsistency : {} != {} metadataKey = {}", per, lastEndRow, - prevEndRowKey); - - currentTabletKeys = null; - resetScanner(); - - sleepUninterruptibly(250, TimeUnit.MILLISECONDS); - - continue; - - } - // this tablet is good, so set it as the last tablet - lastTablet = prevEndRowKey.getRow(); - } - - return currentTabletKeys.size() > 0; - } - - @Override - public Map next() { - - if (!hasNext()) - throw new NoSuchElementException(); - - Map tmp = currentTabletKeys; - currentTabletKeys = null; - - Set> es = tmp.entrySet(); - Iterator> esIter = es.iterator(); - - while (esIter.hasNext()) { - Map.Entry entry = esIter.next(); - if (!returnPrevEndRow - && TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) { - esIter.remove(); - } - - if (!returnDir - && TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(entry.getKey())) { - esIter.remove(); - } - } - - return tmp; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - private SortedMap scanToPrevEndRow() { - - Text curMetaDataRow = null; - - TreeMap tm = new TreeMap<>(); - - boolean sawPrevEndRow = false; - - while (true) { - while (iter.hasNext()) { - Entry entry = iter.next(); - - if (curMetaDataRow == null) { - curMetaDataRow = entry.getKey().getRow(); - } - - if (!curMetaDataRow.equals(entry.getKey().getRow())) { - // tablet must not have a prev end row, try scanning again - break; - } - - tm.put(entry.getKey(), entry.getValue()); - - if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) { - sawPrevEndRow = true; - break; - } - } - - if (!sawPrevEndRow && tm.size() > 0) { - log.warn("Metadata problem : tablet {} has no prev end row", curMetaDataRow); - resetScanner(); - curMetaDataRow = null; - tm.clear(); - sleepUninterruptibly(250, TimeUnit.MILLISECONDS); - } else { - break; - } - } - - return tm; - } - - protected void resetScanner() { - - Range range; - - if (lastTablet == null) { - range = this.range; - } else { - // check to see if the last tablet still exist - range = new Range(lastTablet, true, lastTablet, true); - scanner.setRange(range); - int count = Iterators.size(scanner.iterator()); - - if (count == 0) - throw new TabletDeletedException("Tablet " + lastTablet + " was deleted while iterating"); - - // start right after the last good tablet - range = new Range(new Key(lastTablet).followingKey(PartialKey.ROW), true, - this.range.getEndKey(), this.range.isEndKeyInclusive()); - } - - log.info("Resetting {} scanner to {}", MetadataTable.NAME, range); - - scanner.setRange(range); - iter = scanner.iterator(); - - } - -} diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java index 04f0c1a..7733298 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java @@ -33,18 +33,12 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.Table; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.gc.GarbageCollectionEnvironment.Reference; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.replication.StatusUtil; import org.apache.accumulo.server.replication.proto.Replication.Status; -import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -170,21 +164,17 @@ public class GarbageCollectionAlgorithm { } - Iterator> iter = gce.getReferenceIterator(); + Iterator iter = gce.getReferences().iterator(); while (iter.hasNext()) { - Entry entry = iter.next(); - Key key = entry.getKey(); - Text cft = key.getColumnFamily(); - - if (cft.equals(DataFileColumnFamily.NAME) || cft.equals(ScanFileColumnFamily.NAME)) { - String cq = key.getColumnQualifier().toString(); - - String reference = cq; - if (cq.startsWith("/")) { - String tableID = new String(KeyExtent.tableOfMetadataRow(key.getRow())); - reference = "/" + tableID + cq; - } else if (!cq.contains(":") && !cq.startsWith("../")) { - throw new RuntimeException("Bad file reference " + cq); + Reference ref = iter.next(); + + if (!ref.isDir) { + + String reference = ref.ref; + if (reference.startsWith("/")) { + reference = "/" + ref.id + reference; + } else if (!reference.contains(":") && !reference.startsWith("../")) { + throw new RuntimeException("Bad file reference " + reference); } reference = makeRelative(reference, 3); @@ -198,9 +188,9 @@ public class GarbageCollectionAlgorithm { if (candidateMap.remove(dir) != null) log.debug("Candidate was still in use: {}", reference); - } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { - String tableID = new String(KeyExtent.tableOfMetadataRow(key.getRow())); - String dir = entry.getValue().toString(); + } else { + String tableID = ref.id.toString(); + String dir = ref.ref; if (!dir.contains(":")) { if (!dir.startsWith("/")) throw new RuntimeException("Bad directory " + dir); @@ -211,9 +201,7 @@ public class GarbageCollectionAlgorithm { if (candidateMap.remove(dir) != null) log.debug("Candidate was still in use: {}", dir); - } else - throw new RuntimeException( - "Scanner over metadata table returned unexpected column : " + entry.getKey()); + } } confirmDeletesFromReplication(gce.getReplicationNeededIterator(), diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java index f9abfa1..1a8ec59 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java @@ -23,11 +23,13 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; +import java.util.stream.Stream; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.Table; +import org.apache.accumulo.core.client.impl.Table.ID; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.MetadataTable; @@ -62,6 +64,18 @@ public interface GarbageCollectionEnvironment { Iterator getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException; + static class Reference { + public final ID id; + public final String ref; + public final boolean isDir; + + Reference(ID id, String ref, boolean isDir) { + this.id = id; + this.ref = ref; + this.isDir = isDir; + } + } + /** * Fetches the references to files, {@link DataFileColumnFamily#NAME} or * {@link ScanFileColumnFamily#NAME}, from tablets @@ -69,7 +83,7 @@ public interface GarbageCollectionEnvironment { * @return An {@link Iterator} of {@link Entry}<{@link Key}, {@link Value}> which constitute * a reference to a file. */ - Iterator> getReferenceIterator() + Stream getReferences() throws TableNotFoundException, AccumuloException, AccumuloSecurityException; /** diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 7ac9b59..3c60861 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -30,6 +30,7 @@ import java.util.SortedMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; @@ -58,10 +59,9 @@ import org.apache.accumulo.core.gc.thrift.GcCycleStats; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataScanner; import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTableOfflineException; @@ -102,7 +102,6 @@ import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.server.tables.TableManager; import org.apache.accumulo.server.util.Halt; -import org.apache.accumulo.server.util.TabletIterator; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -297,18 +296,23 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa } @Override - public Iterator> getReferenceIterator() + public Stream getReferences() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - IsolatedScanner scanner = new IsolatedScanner( - getConnector().createScanner(tableName, Authorizations.EMPTY)); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.fetchColumnFamily(ScanFileColumnFamily.NAME); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); - TabletIterator tabletIterator = new TabletIterator(scanner, - MetadataSchema.TabletsSection.getRange(), false, true); - - return Iterators - .concat(Iterators.transform(tabletIterator, input -> input.entrySet().iterator())); + + Stream tabletStream = MetadataScanner.builder().from(getConnector()) + .overTabletRange().checkConsistency().fetchDir().fetchFiles().fetchScans().build() + .stream(); + + Stream refStream = tabletStream.flatMap(tm -> { + Stream refs = Stream.concat(tm.getFiles().stream(), tm.getScans().stream()) + .map(f -> new Reference(tm.getTableId(), f, false)); + if (tm.getDir() != null) { + refs = Stream.concat(refs, Stream.of(new Reference(tm.getTableId(), tm.getDir(), true))); + } + return refs; + }); + + return refStream; } @Override diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java index a719378..6bedd68 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java @@ -27,18 +27,13 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.stream.Stream; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.impl.Table; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.server.replication.StatusUtil; import org.apache.accumulo.server.replication.proto.Replication.Status; -import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; @@ -46,7 +41,7 @@ public class GarbageCollectionTest { static class TestGCE implements GarbageCollectionEnvironment { TreeSet candidates = new TreeSet<>(); ArrayList blips = new ArrayList<>(); - Map references = new TreeMap<>(); + Map references = new TreeMap<>(); HashSet tableIds = new HashSet<>(); ArrayList deletes = new ArrayList<>(); @@ -69,8 +64,8 @@ public class GarbageCollectionTest { } @Override - public Iterator> getReferenceIterator() { - return references.entrySet().iterator(); + public Stream getReferences() { + return references.values().stream(); } @Override @@ -89,41 +84,21 @@ public class GarbageCollectionTest { tablesDirsToDelete.add(tableID); } - public Key newFileReferenceKey(String tableId, String endRow, String file) { - String row = new KeyExtent(Table.ID.of(tableId), endRow == null ? null : new Text(endRow), - null).getMetadataEntry().toString(); - String cf = MetadataSchema.TabletsSection.DataFileColumnFamily.NAME.toString(); - return new Key(row, cf, file); + public void addFileReference(String tableId, String endRow, String file) { + references.put(tableId + ":" + endRow + ":" + file, + new Reference(Table.ID.of(tableId), file, false)); } - public Value addFileReference(String tableId, String endRow, String file) { - Key key = newFileReferenceKey(tableId, endRow, file); - Value val = new Value(new DataFileValue(0, 0).encode()); - return references.put(key, val); + public void removeFileReference(String tableId, String endRow, String file) { + references.remove(tableId + ":" + endRow + ":" + file); } - public Value removeFileReference(String tableId, String endRow, String file) { - return references.remove(newFileReferenceKey(tableId, endRow, file)); + public void addDirReference(String tableId, String endRow, String dir) { + references.put(tableId + ":" + endRow, new Reference(Table.ID.of(tableId), dir, true)); } - Key newDirReferenceKey(String tableId, String endRow) { - String row = new KeyExtent(Table.ID.of(tableId), endRow == null ? null : new Text(endRow), - null).getMetadataEntry().toString(); - String cf = MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN - .getColumnFamily().toString(); - String cq = MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN - .getColumnQualifier().toString(); - return new Key(row, cf, cq); - } - - public Value addDirReference(String tableId, String endRow, String dir) { - Key key = newDirReferenceKey(tableId, endRow); - Value val = new Value(dir.getBytes()); - return references.put(key, val); - } - - public Value removeDirReference(String tableId, String endRow) { - return references.remove(newDirReferenceKey(tableId, endRow)); + public void removeDirReference(String tableId, String endRow) { + references.remove(tableId + ":" + endRow); } @Override diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java index 41aa3d8..3511c82 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java +++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java @@ -68,6 +68,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletDeletedException; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; import org.apache.accumulo.core.replication.ReplicationTable; @@ -92,7 +93,6 @@ import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretM import org.apache.accumulo.server.util.NamespacePropUtil; import org.apache.accumulo.server.util.SystemPropUtil; import org.apache.accumulo.server.util.TablePropUtil; -import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java index bfd7fdd..1baac99 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java @@ -107,11 +107,10 @@ class LoadFiles extends MasterRepo { Text startRow = loadMapEntry.getKey().getPrevEndRow(); - Iterable tableMetadata = MetadataScanner.builder().from(master) - .overUserTableId(tableId, startRow, null).fetchPrev().fetchLocation().fetchLoaded().build(); - long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); - Iterator tabletIter = tableMetadata.iterator(); + Iterator tabletIter = MetadataScanner.builder().from(master) + .overRange(tableId, startRow, null).checkConsistency().fetchPrev().fetchLocation() + .fetchLoaded().build().iterator(); List tablets = new ArrayList<>(); TabletMetadata currentTablet = tabletIter.next(); diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java index 345aa20..18ddab9 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java @@ -166,9 +166,9 @@ public class PrepBulkImport extends MasterRepo { Iterators.transform(lmi, entry -> entry.getKey()); TabletIterFactory tabletIterFactory = startRow -> { - Iterable tableMetadata = MetadataScanner.builder().from(master) - .overUserTableId(bulkInfo.tableId, startRow, null).build(); - return Iterators.transform(tableMetadata.iterator(), tm -> tm.getExtent()); + return MetadataScanner.builder().from(master).overRange(bulkInfo.tableId, startRow, null) + .checkConsistency().fetchPrev().build().stream().map(TabletMetadata::getExtent) + .iterator(); }; checkForMerge(bulkInfo.tableId.canonicalID(), diff --git a/test/src/main/java/org/apache/accumulo/test/CloneIT.java b/test/src/main/java/org/apache/accumulo/test/CloneIT.java index 0c4c6a9..aa1b9b3 100644 --- a/test/src/main/java/org/apache/accumulo/test/CloneIT.java +++ b/test/src/main/java/org/apache/accumulo/test/CloneIT.java @@ -35,10 +35,10 @@ import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletDeletedException; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.accumulo.server.util.TabletIterator; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -389,8 +389,6 @@ public class CloneIT extends AccumuloClusterHarness { try { MetadataTableUtil.checkClone(tableName, Table.ID.of("0"), Table.ID.of("1"), conn, bw2); fail(); - } catch (TabletIterator.TabletDeletedException tde) {} - + } catch (TabletDeletedException tde) {} } - } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java index 68a4b36..d585549 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java @@ -27,28 +27,18 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; -import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Merge; import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.server.util.TabletIterator; -import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException; import org.apache.hadoop.io.Text; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; public class MergeIT extends AccumuloClusterHarness { @@ -217,72 +207,4 @@ public class MergeIT extends AccumuloClusterHarness { } } } - - @Rule - public ExpectedException exception = ExpectedException.none(); - - private static class TestTabletIterator extends TabletIterator { - - private final Connector conn; - private final String metadataTableName; - - public TestTabletIterator(Connector conn, String metadataTableName) throws Exception { - super(conn.createScanner(metadataTableName, Authorizations.EMPTY), - MetadataSchema.TabletsSection.getRange(), true, true); - this.conn = conn; - this.metadataTableName = metadataTableName; - } - - @Override - protected void resetScanner() { - try (Scanner ds = conn.createScanner(metadataTableName, Authorizations.EMPTY)) { - - Text tablet = new KeyExtent(Table.ID.of("0"), new Text("m"), null).getMetadataEntry(); - ds.setRange(new Range(tablet, true, tablet, true)); - - Mutation m = new Mutation(tablet); - - BatchWriter bw = conn.createBatchWriter(metadataTableName, new BatchWriterConfig()); - for (Entry entry : ds) { - Key k = entry.getKey(); - m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp()); - } - bw.addMutation(m); - bw.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - super.resetScanner(); - } - - } - - // simulate a merge happening while iterating over tablets - @Test - public void testMerge() throws Exception { - // create a fake metadata table - String metadataTableName = getUniqueNames(1)[0]; - getConnector().tableOperations().create(metadataTableName); - - KeyExtent ke1 = new KeyExtent(Table.ID.of("0"), new Text("m"), null); - Mutation mut1 = ke1.getPrevRowUpdateMutation(); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut1, new Value("/d1".getBytes())); - - KeyExtent ke2 = new KeyExtent(Table.ID.of("0"), null, null); - Mutation mut2 = ke2.getPrevRowUpdateMutation(); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut2, new Value("/d2".getBytes())); - - BatchWriter bw1 = getConnector().createBatchWriter(metadataTableName, new BatchWriterConfig()); - bw1.addMutation(mut1); - bw1.addMutation(mut2); - bw1.close(); - - TestTabletIterator tabIter = new TestTabletIterator(getConnector(), metadataTableName); - - exception.expect(TabletDeletedException.class); - while (tabIter.hasNext()) { - tabIter.next(); - } - } } -- To stop receiving notification emails like this one, please contact kturner@apache.org.