tephra-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anew <...@git.apache.org>
Subject [GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...
Date Sat, 03 Dec 2016 16:21:42 GMT
Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759897
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid
transactions
    + * after every major compaction of a region. Let's call this <i>(region, prune
upper bound)</i>.
    + * In addition, the plugin also persists the following information on a run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
    + *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress
transaction that
    + *     will not have writes in any HBase regions that are created after time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the transaction state
at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the number of regions
due to region creations,
    + * splits and merges. At any given time there can always be a region on which a major
compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the set of regions
that exist at that time
    + * of each run of the plugin, and use historical region set for time <i>t</i>,
<i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where all
regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper
bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound returned
by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region that
was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than the prune upper
bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region,
prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from
an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time the
region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper bound)</i>,
there should be no invalid
    + * transactions smaller than the plugin prune upper bound with writes in any transactional
region of
    + * this HBase instance.
    + *
    + * <p/>
    + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
    + * then you may need to write a new plugin to compute prune upper bound for those tables.
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
    +  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier()
{
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Determines prune upper bound for the data store as mentioned above.
    +   */
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws IOException
{
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper bound {}",
time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    // Get all the current transactional regions
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(),
time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a given time is
complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", pruneUpperBoundForTime,
time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
    +  }
    +
    +  /**
    +   * After invalid list has been pruned, this cleans up state information that is no
longer required.
    +   * This includes -
    +   * <ul>
    +   *   <li>
    +   *     <i>(region, prune upper bound)</i> - prune upper bound for regions
that are older
    +   *     than maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     <i>(t, set of regions) - Regions set that were recorded on or before the
start time
    +   *     of maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     (t, prune upper bound) - Smallest not in-progress transaction without any writes
in new regions
    +   *     information recorded on or before the start time of maxPrunedInvalid
    +   *   </li>
    +   * </ul>
    +   */
    +  @Override
    +  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    +    if (time < 0 || maxPrunedInvalid < 0) {
    +      return;
    +    }
    +
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before {}", maxPrunedInvalid);
    +      dataJanitorState.deleteRegionsWithPruneUpperBoundBefore(maxPrunedInvalid, regionsToExclude.getRegions());
    +    } else {
    +      LOG.warn("Cannot find saved regions on or before time {}", time);
    +    }
    +    long pruneTime = maxPrunedInvalid / TxConstants.MAX_TX_PER_MS;
    +    LOG.debug("Deleting regions recorded before time {}", pruneTime);
    +    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
    +    LOG.debug("Deleting prune upper bounds recorded on or before time {}", pruneTime);
    +    dataJanitorState.deletePruneUpperBoundsOnOrBeforeTime(pruneTime);
    +  }
    +
    +  @Override
    +  public void destroy() {
    +    LOG.info("Stopping plugin...");
    +    try {
    +      connection.close();
    +    } catch (IOException e) {
    +      LOG.error("Got exception while closing HBase connection", e);
    +    }
    +  }
    +
    +  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
    +    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
    +  }
    +
    +  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
    +    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +    try (Admin admin = connection.getAdmin()) {
    +      HTableDescriptor[] tableDescriptors = admin.listTables();
    +      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
    +      if (tableDescriptors != null) {
    +        for (HTableDescriptor tableDescriptor : tableDescriptors) {
    +          if (isTransactionalTable(tableDescriptor)) {
    +            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
    +            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
    +            if (tableRegions != null) {
    +              for (HRegionInfo region : tableRegions) {
    +                regions.add(region.getRegionName());
    +              }
    +            }
    +          } else {
    +            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
    +          }
    +        }
    +      }
    +    }
    +    return regions;
    +  }
    +
    +  /**
    +   * Try to find the latest set of regions in which all regions have been major compacted,
and
    +   * compute prune upper bound from them. Starting from newest to oldest, this looks
into the
    +   * region set that has been saved periodically, and joins it with the prune upper bound
data
    +   * for a region recorded after a major compaction.
    +   *
    +   * @param timeRegions the latest set of regions
    +   * @return prune upper bound
    +   * @throws IOException when not able to talk to HBase
    +   */
    +  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
    +    do {
    +      LOG.debug("Computing prune upper bound for {}", timeRegions);
    +      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
    +      long time = timeRegions.getTime();
    +
    +      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
    +      logPruneUpperBoundRegions(pruneUpperBoundRegions);
    +      // If prune upper bounds are found for all the transactional regions, then compute
the prune upper bound
    +      // across all regions
    +      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() ==
transactionalRegions.size()) {
    +        long pruneUpperBoundForTime = dataJanitorState.getPruneUpperBoundForTime(time);
    +        LOG.debug("Found max prune upper bound {} for time {}", pruneUpperBoundForTime,
time);
    +        // If pruneUpperBoundForTime is not recorded then that means the data is not
complete for these regions
    +        if (pruneUpperBoundForTime != -1) {
    --- End diff --
    
    It would be good to debug-log if it is -1. That would indicate the reason for the debug
message in line 264.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message