phoenix-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [phoenix] kadirozde commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes for Non-Transactional Tables
Date Fri, 31 May 2019 16:46:10 GMT
kadirozde commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes
for Non-Transactional Tables
URL: https://github.com/apache/phoenix/pull/469#discussion_r289467355
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 ##########
 @@ -0,0 +1,906 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
+import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
+import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
+import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.LockManager.RowLock;
+import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException;
+import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
+import org.apache.phoenix.hbase.index.builder.IndexBuilder;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
+import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
+import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.trace.TracingUtils;
+import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+/**
+ * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are
passed
+ * to an {@link IndexBuilder} to determine the actual updates to make.
+ * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)}
and
+ * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because
+ * Phoenix always does batch mutations.
+ * <p>
+ */
+public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
+
+  private static final Log LOG = LogFactory.getLog(IndexRegionObserver.class);
+  private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS);
+  private static final OperationStatus NOWRITE = new OperationStatus(OperationStatusCode.SUCCESS);
+
+  /**
+   * Class to represent pending data table rows
+   */
+  private static class PendingRow {
+      private long latestTimestamp;
+      private long count;
+      PendingRow(long latestTimestamp) {
+          count = 1;
+          this.latestTimestamp = latestTimestamp;
+      }
+
+      public void add(long timestamp) {
+          count++;
+          if (latestTimestamp < timestamp) {
+              latestTimestamp = timestamp;
+          }
+      }
+
+      public void remove() {
+          count--;
+      }
+
+      public long getCount() {
+          return count;
+      }
+
+      public long getLatestTimestamp() {
+          return latestTimestamp;
+      }
+  }
+
+  private static boolean skipPostIndexUpdatesForTesting = false;
+  private static boolean skipDataTableUpdatesForTesting = false;
+
+  public static void setSkipPostIndexUpdatesForTesting(boolean skip) {
+      skipPostIndexUpdatesForTesting = skip;
+  }
+
+  public static void setSkipDataTableUpdatesForTesting(boolean skip) {
+      skipDataTableUpdatesForTesting = skip;
+  }
+
+  // Hack to get around not being able to save any state between
+  // coprocessor calls. TODO: remove after HBASE-18127 when available
+  private static class BatchMutateContext {
+      private final int clientVersion;
+      // The collection of index mutations that will be applied before the data table mutations.
The empty column (i.e.,
+      // the verified column) will have the value false ("unverified") on these mutations
+      private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList();
+      // The collection of index mutations that will be applied after the data table mutations.
The empty column (i.e.,
+      // the verified column) will have the value true ("verified") on the put mutations
+      private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
+      // The collection of candidate index mutations that will be applied after the data
table mutations
+      private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
+      private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+      // The set of row keys for the data table rows of this batch such that for each of
these rows there exists another
+      // batch with a timestamp earlier than the timestamp of this batch and the earlier
batch has a mutation on the
+      // row (i.e., concurrent updates).
+      private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
+
+      private BatchMutateContext(int clientVersion) {
+          this.clientVersion = clientVersion;
+      }
+  }
+  
+  private ThreadLocal<BatchMutateContext> batchMutateContext =
+          new ThreadLocal<BatchMutateContext>();
+  
+  /** Configuration key for the {@link IndexBuilder} to use */
+  public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
+
+  /**
+   * Configuration key for if the indexer should check the version of HBase is running. Generally,
+   * you only want to ignore this for testing or for custom versions of HBase.
+   */
+  public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
+
+  private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
+
+  public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write";
+  private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
+
+  private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
+  private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
+  private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
+  private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
+  private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
+  private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
+
+  // Index writers get invoked before and after data table updates
+  protected IndexWriter preWriter;
+  protected IndexWriter postWriter;
+
+  protected IndexBuildManager builder;
+  private LockManager lockManager;
+
+  // The collection of pending data table rows
+  private Map<ImmutableBytesPtr, PendingRow> pendingRows = new ConcurrentHashMap<>();
+
+  /**
+   * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
+   * more robust in the face of recoverying index regions that were on the same server as
the
+   * primary table region
+   */
+  private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
+
+  /**
+   * IndexWriter for writing the recovered index edits. Separate from the main indexer since
we need
+   * different write/failure policies
+   */
+  private IndexWriter recoveryWriter;
+
+  private MetricsIndexerSource metricSource;
+
+  private boolean stopped;
+  private boolean disabled;
+  private long slowIndexWriteThreshold;
+  private long slowIndexPrepareThreshold;
+  private long slowPostOpenThreshold;
+  private long slowPreIncrementThreshold;
+  private int rowLockWaitDuration;
+
+  private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+
+  @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+      try {
+        final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+        String serverName = env.getServerName().getServerName();
+        if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
+          // make sure the right version <-> combinations are allowed.
+          String errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration());
+          if (errormsg != null) {
+              throw new FatalIndexBuildingFailureException(errormsg);
+          }
+        }
+
+        this.builder = new IndexBuildManager(env);
+        // Clone the config since it is shared
+        DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env,
ConnectionType.INDEX_WRITER_CONNECTION);
+        // setup the actual index preWriter
+        this.preWriter = new IndexWriter(indexWriterEnv, serverName + "-index-preWriter");
+        if (env.getConfiguration().getBoolean(INDEX_LAZY_POST_BATCH_WRITE, INDEX_LAZY_POST_BATCH_WRITE_DEFAULT))
{
+            this.postWriter = new IndexWriter(indexWriterEnv, new LazyParallelWriterIndexCommitter(),
serverName + "-index-postWriter");
+        }
+        else {
+            this.postWriter = this.preWriter;
+        }
+        
+        this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration",
+                DEFAULT_ROWLOCK_WAIT_DURATION);
+        this.lockManager = new LockManager();
+
+        // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2
compat
+        this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+        setSlowThresholds(e.getConfiguration());
+
+        try {
+          // get the specified failure policy. We only ever override it in tests, but we
need to do it
+          // here
+          Class<? extends IndexFailurePolicy> policyClass =
+              env.getConfiguration().getClass(INDEX_RECOVERY_FAILURE_POLICY_KEY,
+                StoreFailuresInCachePolicy.class, IndexFailurePolicy.class);
+          IndexFailurePolicy policy =
+              policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(failedIndexEdits);
+          LOG.debug("Setting up recovery writter with failure policy: " + policy.getClass());
+          recoveryWriter =
+              new RecoveryIndexWriter(policy, indexWriterEnv, serverName + "-recovery-writer");
+        } catch (Exception ex) {
+          throw new IOException("Could not instantiate recovery failure policy!", ex);
+        }
+      } catch (NoSuchMethodError ex) {
+          disabled = true;
+          LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
+      }
+  }
+
+  /**
+   * Extracts the slow call threshold values from the configuration.
+   */
+  private void setSlowThresholds(Configuration c) {
+      slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY,
+          INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT);
+      slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY,
+          INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT);
+      slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY,
+          INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT);
+      slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY,
+          INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT);
+  }
+
+  private String getCallTooSlowMessage(String callName, long duration, long threshold) {
+      StringBuilder sb = new StringBuilder(64);
+      sb.append("(callTooSlow) ").append(callName).append(" duration=").append(duration);
+      sb.append("ms, threshold=").append(threshold).append("ms");
+      return sb.toString();
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    if (this.stopped) {
+      return;
+    }
+    if (this.disabled) {
+        return;
+      }
+    this.stopped = true;
+    String msg = "Indexer is being stopped";
+    this.builder.stop(msg);
+    this.preWriter.stop(msg);
+    this.recoveryWriter.stop(msg);
+    this.postWriter.stop(msg);
+  }
+
+  /**
+   * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
+   * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not
a
+   * real increment, though, it's really more of a Put. We translate the Increment into a
+   * list of mutations, at most a single Put and Delete that are the changes upon executing
+   * the list of ON DUPLICATE KEY clauses for this row.
+   */
+  @Override
+  public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment>
e,
+          final Increment inc) throws IOException {
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      try {
+          List<Mutation> mutations = this.builder.executeAtomicOp(inc);
+          if (mutations == null) {
+              return null;
+          }
+
+          // Causes the Increment to be ignored as we're committing the mutations
+          // ourselves below.
+          e.bypass();
+          // ON DUPLICATE KEY IGNORE will return empty list if row already exists
+          // as no action is required in that case.
+          if (!mutations.isEmpty()) {
+              Region region = e.getEnvironment().getRegion();
+              // Otherwise, submit the mutations directly here
+                region.batchMutate(mutations.toArray(new Mutation[0]));
+          }
+          return Result.EMPTY_RESULT;
+      } catch (Throwable t) {
+          throw ServerUtil.createIOException(
+                  "Unable to process ON DUPLICATE IGNORE for " + 
 
 Review comment:
   As you know, IndexRegionObserver will replace Indexer eventually. I have started with Indexer
and made required changes to implement IndexRegionObserver. I did not need to touch this method.
So, the code is inherited from Indexer. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message