phoenix-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [phoenix] gokceni commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes for Non-Transactional Tables
Date Thu, 02 May 2019 23:57:53 GMT
gokceni commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes
for Non-Transactional Tables
URL: https://github.com/apache/phoenix/pull/469#discussion_r280635199
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
 ##########
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.util.IndexUtil;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * Abstract class to Write index updates to the index tables in parallel.
+ */
+public abstract class AbstractParallelWriterIndexCommitter implements IndexCommitter {
+
+    public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+    private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+    public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
+    private static final Log LOG = LogFactory.getLog(AbstractParallelWriterIndexCommitter.class);
+
+    protected HTableFactory retryingFactory;
+    protected HTableFactory noRetriesfactory;
+    protected Stoppable stopped;
+    protected QuickFailingTaskRunner pool;
+    protected KeyValueBuilder kvBuilder;
+    protected RegionCoprocessorEnvironment env;
+    protected TaskBatch<Void> tasks;
+
+    public AbstractParallelWriterIndexCommitter() {}
+
+    // For testing
+    public AbstractParallelWriterIndexCommitter(String hbaseVersion) {
+        kvBuilder = KeyValueBuilder.get(hbaseVersion);
+    }
+
+    @Override
+    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name)
{
+        this.env = env;
+        Configuration conf = env.getConfiguration();
+        setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+                ThreadPoolManager.getExecutor(
+                        new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+                                DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), parent, env);
+        this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
+    }
+
+    /**
+     * Setup <tt>this</tt>.
+     * <p>
+     * Exposed for TESTING
+     */
+    public void setup(HTableFactory factory, ExecutorService pool,Stoppable stop, RegionCoprocessorEnvironment
env) {
+        this.retryingFactory = factory;
+        this.noRetriesfactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
+        this.pool = new QuickFailingTaskRunner(pool);
+        this.stopped = stop;
+    }
+
+    abstract HTableFactory getHTableFactory (int clientVersion);
+
+    @Override
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean
allowLocalUpdates, final int clientVersion) throws SingleIndexWriteFailureException {
+        /*
+         * This bit here is a little odd, so let's explain what's going on. Basically, we
want to do the writes in
+         * parallel to each index table, so each table gets its own task and is submitted
to the pool. Where it gets
+         * tricky is that we want to block the calling thread until one of two things happens:
(1) all index tables get
+         * successfully updated, or (2) any one of the index table writes fail; in either
case, we should return as
+         * quickly as possible. We get a little more complicated in that if we do get a single
failure, but any of the
+         * index writes hasn't been started yet (its been queued up, but not submitted to
a thread) we want to that task
+         * to fail immediately as we know that write is a waste and will need to be replayed
anyways.
+         */
+
+        Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries
= toWrite.asMap().entrySet();
+        tasks = new TaskBatch<Void>(entries.size());
+        for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries)
{
+            // get the mutations for each table. We leak the implementation here a little
bit to save
+            // doing a complete copy over of all the index update for each table.
+            final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
+            final HTableInterfaceReference tableReference = entry.getKey();
+			if (env != null
+					&& !allowLocalUpdates
+					&& tableReference.getTableName().equals(
+							env.getRegion().getTableDescriptor().getTableName().getNameAsString())) {
+				continue;
+			}
+            /*
+             * Write a batch of index updates to an index table. This operation stops (is
cancelable) via two
+             * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting
the running thread.
+             * The former will only work if we are not in the midst of writing the current
batch to the table, though we
+             * do check these status variables before starting and before writing the batch.
The latter usage,
+             * interrupting the thread, will work in the previous situations as was at some
points while writing the
+             * batch, depending on the underlying writer implementation (HTableInterface#batch
is blocking, but doesn't
+             * elaborate when is supports an interrupt).
+             */
+            tasks.add(new Task<Void>() {
+
+                /**
+                 * Do the actual write to the primary table.
+                 * 
+                 * @return
+                 */
+                @SuppressWarnings("deprecation")
+                @Override
+                public Void call() throws Exception {
+                    Table table = null;
+                    // this may have been queued, so another task infront of us may have
failed, so we should
+                    // early exit, if that's the case
+                    throwFailureIfDone();
+
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
+                    }
+                    try {
+                        if (allowLocalUpdates
+                                && env != null
+                                && tableReference.getTableName().equals(
+                                    env.getRegion().getTableDescriptor().getTableName().getNameAsString()))
{
+                            try {
+                                throwFailureIfDone();
+                                IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
+                                return null;
+                            } catch (IOException ignored) {
+                                // when it's failed we fall back to the standard & slow
way
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("indexRegion.batchMutate failed and fall back
to HTable.batch(). Got error="
+                                            + ignored);
+                                }
+                            }
+                        }
+                     // if the client can retry index writes, then we don't need to retry
here
 
 Review comment:
   nit: spacing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message