hbase-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Taylor (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HBASE-10254) Optionally return null when attempting to Increment KeyValue that doesn't exist
Date Mon, 30 Dec 2013 20:09:51 GMT

    [ https://issues.apache.org/jira/browse/HBASE-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13859046#comment-13859046
] 

James Taylor commented on HBASE-10254:
--------------------------------------

As an experiment, I copy/pasted the HRegion.increment() code, adding a incrementTimestamp
argument and returning null if the KeyValue doesn't exist (see below). Here's what I learned:
- you can't get at the HRegion.updatesLock, so this is likely problematic
- you can't do a HRegion.requestFlush()
- too much of the guts of the implementation leaks out to coprocessors IMO

I'm thinking to just have my own coprocessor that locks the row and does a Get followed by
a Put through the HRegion methods. It'll be a lot less code.

package com.salesforce.phoenix.coprocessor;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequenceEndpointImpl extends BaseEndpointCoprocessor {
    private static final Logger LOG = LoggerFactory.getLogger(GroupedAggregateRegionObserver.class);
    private static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;

    // If updating multiple rows in one call, wait longer,
    // i.e. waiting for busyWaitDuration * # of rows. However,
    // we can limit the max multiplier.
    private int maxBusyWaitMultiplier;

    // Max busy wait duration. There is no point to wait longer than the RPC
    // purge timeout, when a RPC call will be terminated by the RPC engine.
    private long maxBusyWaitDuration;
    // The internal wait duration to acquire a lock before read/update
    // from the region. It is not per row. The purpose of this wait time
    // is to avoid waiting a long time while the region is busy, so that
    // we can release the IPC handler soon enough to improve the
    // availability of the region server. It can be adjusted by
    // tuning configuration "hbase.busy.wait.duration".
    private long busyWaitDuration;
    private long memstoreFlushSize;
    private boolean deferredLogSyncDisabled;

    // Stop updates lock
    private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();

    @Override
    public void start(CoprocessorEnvironment e) {
        Configuration conf = e.getConfiguration();
        this.busyWaitDuration = conf.getLong("hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
        this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
        if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) { throw new IllegalArgumentException(
                "Invalid hbase.busy.wait.duration (" + busyWaitDuration + ") or hbase.busy.wait.multiplier.max
("
                        + maxBusyWaitMultiplier + "). Their product should be positive");
}
        this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
                2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
        RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
        HRegion region = env.getRegion();
        long flushSize = region.getTableDesc().getMemStoreFlushSize();

        if (flushSize <= 0) {
            flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
                    HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
        }
        this.memstoreFlushSize = flushSize;
        // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is
disabled.
        this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
1 * 1000) <= 0;

    }

    public static boolean rowIsInRange(HRegion info, final byte[] row) {
        return ((info.getStartKey().length == 0) || (Bytes.compareTo(info.getStartKey(), row)
<= 0))
                && ((info.getEndKey().length == 0) || (Bytes.compareTo(info.getEndKey(),
row) > 0));
    }

    /** Make sure this is a valid row for the HRegion */
    void checkRow(final byte[] row, String op, HRegion region) throws IOException {
        if (!rowIsInRange(region, row)) { throw new WrongRegionException("Requested row out
of range for " + op
                + " on HRegion " + this + ", startKey='" + Bytes.toStringBinary(region.getStartKey())
                + "', getEndKey()='" + Bytes.toStringBinary(region.getEndKey()) + "', row='"
                + Bytes.toStringBinary(row) + "'"); }
    }

    private void lock(final Lock lock) throws RegionTooBusyException, InterruptedIOException
{
        lock(lock, 1);
    }

    /**
     * Try to acquire a lock. Throw RegionTooBusyException if failed to get the lock in time.
Throw
     * InterruptedIOException if interrupted while waiting for the lock.
     */
    private void lock(final Lock lock, final int multiplier) throws RegionTooBusyException,
InterruptedIOException {
        try {
            final long waitTime = Math.min(maxBusyWaitDuration,
                    busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
            if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { throw new RegionTooBusyException(
                    "failed to get a lock in " + waitTime + "ms"); }
        } catch (InterruptedException ie) {
            LOG.info("Interrupted while waiting for a lock");
            InterruptedIOException iie = new InterruptedIOException();
            iie.initCause(ie);
            throw iie;
        }
    }

    @Override
    public RegionCoprocessorEnvironment getEnvironment() {
        return (RegionCoprocessorEnvironment)super.getEnvironment();
    }

    /*
     * @param size
     * @return True if size is over the flush threshold
     */
    private boolean isFlushSize(final long size) {
        return size > memstoreFlushSize;
    }

    /**
     * check if current region is deferred sync enabled.
     */
    private boolean isDeferredLogSyncEnabled(HRegion region) {
        return (region.getTableDesc().isDeferredLogFlush() && !this.deferredLogSyncDisabled);
    }

    public Long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
TimeRange tr,
            long incrementTimestamp, boolean writeToWAL) throws IOException {
        RegionCoprocessorEnvironment env = this.getEnvironment();
        HRegion region = env.getRegion();
        HTableDescriptor htableDescriptor = region.getTableDesc();
        checkRow(row, "increment", region);
        boolean flush = false;
        long txid = 0;
        // Lock row
        region.startRegionOperation();
        try {
            Integer lid = region.getLock(null, row, true);
            lock(this.updatesLock.readLock());
            try {
                Store store = region.getStore(family);

                // Get the old value:
                Get get = new Get(row);
                get.setTimeRange(tr.getMin(), tr.getMax());
                get.addColumn(family, qualifier);

                Result result = region.get(get);
                if (result.isEmpty()) { return null; }

                KeyValue kv = result.raw()[0];
                if (kv.getValueLength() == Bytes.SIZEOF_LONG) {
                    byte[] buffer = kv.getBuffer();
                    int valueOffset = kv.getValueOffset();
                    amount += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
                } else {
                    throw new DoNotRetryIOException("Attempted to increment field that isn't
64 bits wide");
                }
                // build the KeyValue now:
                KeyValue newKv = new KeyValue(row, family, qualifier, incrementTimestamp,
Bytes.toBytes(amount));

                // now log it:
                if (writeToWAL) {
                    long now = EnvironmentEdgeManager.currentTimeMillis();
                    WALEdit walEdit = new WALEdit();
                    walEdit.add(newKv);
                    // Using default cluster id, as this can only happen in the
                    // orginating cluster. A slave cluster receives the final value (not
                    // the delta) as a Put.
                    txid = region.getLog().appendNoSync(region.getRegionInfo(), htableDescriptor.getName(),
walEdit,
                            HConstants.DEFAULT_CLUSTER_ID, now, htableDescriptor);
                }

                long size = store.upsert(Collections.singletonList(newKv));
                size = region.addAndGetGlobalMemstoreSize(size);
                flush = isFlushSize(size);
            } finally {
                this.updatesLock.readLock().unlock();
                region.releaseRowLock(lid);
            }
            if (writeToWAL) {
                // sync the transaction log outside the rowlock
                if (!isDeferredLogSyncEnabled(region)) {
                    region.getLog().sync(txid);
                }
            }
        } finally {
            region.closeRegionOperation();
        }

        if (flush) {
            // Request a cache flush. Do it outside update lock.
            // This method isn't exposed on HRegion, so we can't do this
            // requestFlush();
        }
        return amount;
    }

}


> Optionally return null when attempting to Increment KeyValue that doesn't exist
> -------------------------------------------------------------------------------
>
>                 Key: HBASE-10254
>                 URL: https://issues.apache.org/jira/browse/HBASE-10254
>             Project: HBase
>          Issue Type: Bug
>            Reporter: James Taylor
>
> Instead of creating a new KeyValue starting from 0 when an Increment is done on a row
that doesn't exist, we should optionally return null. A Get is already being done, so it's
easy to detect this case. This can be done in a backward compatible manner if the behavior
is done optionally. In addition, Increment does not allow me to specify the timestamp to use
for the KeyValue. This should be added as well.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Mime
View raw message