falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peeyu...@apache.org
Subject [2/3] falcon git commit: FALCON-1858 Support HBase as a storage backend for Falcon Titan graphDB
Date Wed, 25 May 2016 17:16:21 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
new file mode 100644
index 0000000..b4dc12e
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.core.attribute.Duration;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
+import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
+import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.util.system.IOUtils;
+
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.thinkaurelius.titan.diskstorage.EntryMetaData.*;
+
+/**
+ * Here are some areas that might need work:
+ * <p/>
+ * - batching? (consider HTable#batch, HTable#setAutoFlush(false)
+ * - tuning HTable#setWriteBufferSize (?)
+ * - writing a server-side filter to replace ColumnCountGetFilter, which drops
+ * all columns on the row where it reaches its limit.  This requires getSlice,
+ * currently, to impose its limit on the client side.  That obviously won't
+ * scale.
+ * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
+ * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
+ * <p/>
+ * There may be other problem areas.  These are just the ones of which I'm aware.
+ */
+public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
+
+    private final String tableName;
+    private final HBaseStoreManager storeManager;
+
+    // When using shortened CF names, columnFamily is the shortname and storeName is the longname
+    // When not using shortened CF names, they are the same
+    //private final String columnFamily;
+    private final String storeName;
+    // This is columnFamily.getBytes()
+    private final byte[] columnFamilyBytes;
+    private final HBaseGetter entryGetter;
+
+    private final ConnectionMask cnx;
+
+    private LocalLockMediator<StoreTransaction> localLockMediator;
+
+    private Duration lockExpiryTime;
+
+    HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) {
+        this.storeManager = storeManager;
+        this.cnx = cnx;
+        this.tableName = tableName;
+        //this.columnFamily = columnFamily;
+        this.storeName = storeName;
+        this.columnFamilyBytes = columnFamily.getBytes();
+        this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
+        this.localLockMediator = llm;
+        this.lockExpiryTime = storeManager.getStorageConfig().get(GraphDatabaseConfiguration.LOCK_EXPIRE);
+    }
+
+    @Override
+    public void close() throws BackendException {
+    }
+
+    @Override
+    public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+        Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
+        return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
+    }
+
+    @Override
+    public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+        return getHelper(keys, getFilter(query));
+    }
+
+    @Override
+    public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+        Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
+        mutateMany(mutations, txh);
+    }
+
+    @Override
+    public void acquireLock(StaticBuffer key,
+                            StaticBuffer column,
+                            StaticBuffer expectedValue,
+                            StoreTransaction txh) throws BackendException {
+
+        KeyColumn lockID = new KeyColumn(key, column);
+        logger.debug("Attempting to acquireLock on {} ", lockID);
+        final Timepoint lockStartTime = Timestamps.NANO.getTime(System.nanoTime(), TimeUnit.NANOSECONDS);
+        boolean locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTime));
+        if (!locked) {
+            throw new PermanentLockingException("Could not lock the keyColumn " + lockID +  " on CF {} " + Bytes.toString(columnFamilyBytes));
+        }
+        ((HBaseTransaction) txh).updateLocks(lockID, expectedValue);
+    }
+
+    @Override
+    public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
+        return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
+                query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
+                new FilterList(FilterList.Operator.MUST_PASS_ALL),
+                query);
+    }
+
+    @Override
+    public String getName() {
+        return storeName;
+    }
+
+    @Override
+    public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
+        return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
+    }
+
+    public static Filter getFilter(SliceQuery query) {
+        byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
+        byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
+
+        Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
+
+        if (query.hasLimit()) {
+            filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+                    filter,
+                    new ColumnPaginationFilter(query.getLimit(), 0));
+        }
+
+        logger.debug("Generated HBase Filter {}", filter);
+
+        return filter;
+    }
+
+    private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException {
+        List<Get> requests = new ArrayList<Get>(keys.size());
+        {
+            for (StaticBuffer key : keys) {
+                Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
+                try {
+                    g.setTimeRange(0, Long.MAX_VALUE);
+                } catch (IOException e) {
+                    throw new PermanentBackendException(e);
+                }
+                requests.add(g);
+            }
+        }
+
+        Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size());
+
+        try {
+            TableMask table = null;
+            Result[] results = null;
+
+            try {
+                table = cnx.getTable(tableName);
+                logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
+                results = table.get(requests);
+                logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
+            } finally {
+                IOUtils.closeQuietly(table);
+            }
+
+            if (results == null)
+                return KCVSUtil.emptyResults(keys);
+
+            assert results.length==keys.size();
+
+            for (int i = 0; i < results.length; i++) {
+                Result result = results[i];
+                NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap();
+
+                if (f == null) { // no result for this key
+                    resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
+                    continue;
+                }
+
+                // actual key with <timestamp, value>
+                NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes);
+                resultMap.put(keys.get(i), (r == null)
+                                            ? EntryList.EMPTY_LIST
+                                            : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
+            }
+
+            return resultMap;
+        } catch (IOException e) {
+            throw new TemporaryBackendException(e);
+        }
+    }
+
+    private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
+        storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
+    }
+
+    private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
+        return executeKeySliceQuery(null, null, filters, columnSlice);
+    }
+
+    private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
+                                            @Nullable byte[] endKey,
+                                            FilterList filters,
+                                            @Nullable SliceQuery columnSlice) throws BackendException {
+        Scan scan = new Scan().addFamily(columnFamilyBytes);
+
+        try {
+            scan.setTimeRange(0, Long.MAX_VALUE);
+        } catch (IOException e) {
+            throw new PermanentBackendException(e);
+        }
+
+        if (startKey != null)
+            scan.setStartRow(startKey);
+
+        if (endKey != null)
+            scan.setStopRow(endKey);
+
+        if (columnSlice != null) {
+            filters.addFilter(getFilter(columnSlice));
+        }
+
+        TableMask table = null;
+
+        logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey));
+        try {
+            table = cnx.getTable(tableName);
+            return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
+        } catch (IOException e) {
+            IOUtils.closeQuietly(table);
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private class RowIterator implements KeyIterator {
+        private final Closeable table;
+        private final Iterator<Result> rows;
+        private final byte[] columnFamilyBytes;
+
+        private Result currentRow;
+        private boolean isClosed;
+
+        public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
+            this.table = table;
+            this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
+            this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() {
+                @Override
+                public boolean apply(@Nullable Result result) {
+                    if (result == null)
+                        return false;
+
+                    try {
+                        StaticBuffer id = StaticArrayBuffer.of(result.getRow());
+                        id.getLong(0);
+                    } catch (NumberFormatException e) {
+                        return false;
+                    }
+
+                    return true;
+                }
+            });
+        }
+
+        @Override
+        public RecordIterator<Entry> getEntries() {
+            ensureOpen();
+
+            return new RecordIterator<Entry>() {
+                private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentRow.getMap().get(columnFamilyBytes).entrySet().iterator();
+
+                @Override
+                public boolean hasNext() {
+                    ensureOpen();
+                    return kv.hasNext();
+                }
+
+                @Override
+                public Entry next() {
+                    ensureOpen();
+                    return StaticArrayEntry.ofBytes(kv.next(), entryGetter);
+                }
+
+                @Override
+                public void close() {
+                    isClosed = true;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        @Override
+        public boolean hasNext() {
+            ensureOpen();
+            return rows.hasNext();
+        }
+
+        @Override
+        public StaticBuffer next() {
+            ensureOpen();
+
+            currentRow = rows.next();
+            return StaticArrayBuffer.of(currentRow.getRow());
+        }
+
+        @Override
+        public void close() {
+            IOUtils.closeQuietly(table);
+            isClosed = true;
+            logger.debug("RowIterator closed table {}", table);
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        private void ensureOpen() {
+            if (isClosed)
+                throw new IllegalStateException("Iterator has been closed.");
+        }
+    }
+
+    private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> {
+
+        private final EntryMetaData[] schema;
+
+        private HBaseGetter(EntryMetaData[] schema) {
+            this.schema = schema;
+        }
+
+        @Override
+        public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return element.getKey();
+        }
+
+        @Override
+        public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return element.getValue().lastEntry().getValue();
+        }
+
+        @Override
+        public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return schema;
+        }
+
+        @Override
+        public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) {
+            switch(meta) {
+                case TIMESTAMP:
+                    return element.getValue().lastEntry().getKey();
+                default:
+                    throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
new file mode 100644
index 0000000..52f28af
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
@@ -0,0 +1,926 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.thinkaurelius.titan.diskstorage.Backend;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.CustomizeStoreKCVSManager;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediators;
+import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.thinkaurelius.titan.core.TitanException;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.Entry;
+import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
+import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.diskstorage.util.BufferUtil;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
+import com.thinkaurelius.titan.util.system.IOUtils;
+import com.thinkaurelius.titan.util.system.NetworkUtil;
+
+/**
+ * Storage Manager for HBase
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+@PreInitializeConfigOptions
+public class HBaseStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager, CustomizeStoreKCVSManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseStoreManager.class);
+
+    public static final ConfigNamespace HBASE_NS =
+            new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options");
+
+    public static final ConfigOption<Boolean> SHORT_CF_NAMES =
+            new ConfigOption<Boolean>(HBASE_NS, "short-cf-names",
+            "Whether to shorten the names of Titan's column families to one-character mnemonics " +
+            "to conserve storage space", ConfigOption.Type.FIXED, true);
+
+    public static final String COMPRESSION_DEFAULT = "-DEFAULT-";
+
+    public static final ConfigOption<String> COMPRESSION =
+            new ConfigOption<String>(HBASE_NS, "compression-algorithm",
+            "An HBase Compression.Algorithm enum string which will be applied to newly created column families. " +
+            "The compression algorithm must be installed and available on the HBase cluster.  Titan cannot install " +
+            "and configure new compression algorithms on the HBase cluster by itself.",
+            ConfigOption.Type.MASKABLE, "GZ");
+
+    public static final ConfigOption<Boolean> SKIP_SCHEMA_CHECK =
+            new ConfigOption<Boolean>(HBASE_NS, "skip-schema-check",
+            "Assume that Titan's HBase table and column families already exist. " +
+            "When this is true, Titan will not check for the existence of its table/CFs, " +
+            "nor will it attempt to create them under any circumstances.  This is useful " +
+            "when running Titan without HBase admin privileges.",
+            ConfigOption.Type.MASKABLE, false);
+
+    public static final ConfigOption<String> HBASE_TABLE =
+            new ConfigOption<String>(HBASE_NS, "table",
+            "The name of the table Titan will use.  When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) +
+            " is false, Titan will automatically create this table if it does not already exist.",
+            ConfigOption.Type.LOCAL, "titan");
+
+    /**
+     * Related bug fixed in 0.98.0, 0.94.7, 0.95.0:
+     *
+     * https://issues.apache.org/jira/browse/HBASE-8170
+     */
+    public static final int MIN_REGION_COUNT = 3;
+
+    /**
+     * The total number of HBase regions to create with Titan's table. This
+     * setting only effects table creation; this normally happens just once when
+     * Titan connects to an HBase backend for the first time.
+     */
+    public static final ConfigOption<Integer> REGION_COUNT =
+            new ConfigOption<Integer>(HBASE_NS, "region-count",
+            "The number of initial regions set when creating Titan's HBase table",
+            ConfigOption.Type.MASKABLE, Integer.class, new Predicate<Integer>() {
+                @Override
+                public boolean apply(Integer input) {
+                    return null != input && MIN_REGION_COUNT <= input;
+                }
+            }
+    );
+
+    /**
+     * This setting is used only when {@link #REGION_COUNT} is unset.
+     * <p/>
+     * If Titan's HBase table does not exist, then it will be created with total
+     * region count = (number of servers reported by ClusterStatus) * (this
+     * value).
+     * <p/>
+     * The Apache HBase manual suggests an order-of-magnitude range of potential
+     * values for this setting:
+     *
+     * <ul>
+     *  <li>
+     *   <a href="https://hbase.apache.org/book/important_configurations.html#disable.splitting">2.5.2.7. Managed Splitting</a>:
+     *   <blockquote>
+     *    What's the optimal number of pre-split regions to create? Mileage will
+     *    vary depending upon your application. You could start low with 10
+     *    pre-split regions / server and watch as data grows over time. It's
+     *    better to err on the side of too little regions and rolling split later.
+     *   </blockquote>
+     *  </li>
+     *  <li>
+     *   <a href="https://hbase.apache.org/book/regions.arch.html">9.7 Regions</a>:
+     *   <blockquote>
+     *    In general, HBase is designed to run with a small (20-200) number of
+     *    relatively large (5-20Gb) regions per server... Typically you want to
+     *    keep your region count low on HBase for numerous reasons. Usually
+     *    right around 100 regions per RegionServer has yielded the best results.
+     *   </blockquote>
+     *  </li>
+     * </ul>
+     *
+     * These considerations may differ for other HBase implementations (e.g. MapR).
+     */
+    public static final ConfigOption<Integer> REGIONS_PER_SERVER =
+            new ConfigOption<Integer>(HBASE_NS, "regions-per-server",
+            "The number of regions per regionserver to set when creating Titan's HBase table",
+            ConfigOption.Type.MASKABLE, Integer.class);
+
+    /**
+     * If this key is present in either the JVM system properties or the process
+     * environment (checked in the listed order, first hit wins), then its value
+     * must be the full package and class name of an implementation of
+     * {@link HBaseCompat} that has a no-arg public constructor.
+     * <p>
+     * When this <b>is not</b> set, Titan attempts to automatically detect the
+     * HBase runtime version by calling {@link VersionInfo#getVersion()}. Titan
+     * then checks the returned version string against a hard-coded list of
+     * supported version prefixes and instantiates the associated compat layer
+     * if a match is found.
+     * <p>
+     * When this <b>is</b> set, Titan will not call
+     * {@code VersionInfo.getVersion()} or read its hard-coded list of supported
+     * version prefixes. Titan will instead attempt to instantiate the class
+     * specified (via the no-arg constructor which must exist) and then attempt
+     * to cast it to HBaseCompat and use it as such. Titan will assume the
+     * supplied implementation is compatible with the runtime HBase version and
+     * make no attempt to verify that assumption.
+     * <p>
+     * Setting this key incorrectly could cause runtime exceptions at best or
+     * silent data corruption at worst. This setting is intended for users
+     * running exotic HBase implementations that don't support VersionInfo or
+     * implementations which return values from {@code VersionInfo.getVersion()}
+     * that are inconsistent with Apache's versioning convention. It may also be
+     * useful to users who want to run against a new release of HBase that Titan
+     * doesn't yet officially support.
+     *
+     */
+    public static final ConfigOption<String> COMPAT_CLASS =
+            new ConfigOption<String>(HBASE_NS, "compat-class",
+            "The package and class name of the HBaseCompat implementation. HBaseCompat masks version-specific HBase API differences. " +
+            "When this option is unset, Titan calls HBase's VersionInfo.getVersion() and loads the matching compat class " +
+            "at runtime.  Setting this option forces Titan to instead reflectively load and instantiate the specified class.",
+            ConfigOption.Type.MASKABLE, String.class);
+
+    public static final int PORT_DEFAULT = 9160;
+
+    public static final Timestamps PREFERRED_TIMESTAMPS = Timestamps.MILLI;
+
+    public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE =
+            new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true);
+
+    private static final BiMap<String, String> SHORT_CF_NAME_MAP =
+            ImmutableBiMap.<String, String>builder()
+                    .put(Backend.INDEXSTORE_NAME, "g")
+                    .put(Backend.INDEXSTORE_NAME + Backend.LOCK_STORE_SUFFIX, "h")
+                    .put(Backend.ID_STORE_NAME, "i")
+                    .put(Backend.EDGESTORE_NAME, "e")
+                    .put(Backend.EDGESTORE_NAME + Backend.LOCK_STORE_SUFFIX, "f")
+                    .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME, "s")
+                    .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME + Backend.LOCK_STORE_SUFFIX, "t")
+                    .put(Backend.SYSTEM_MGMT_LOG_NAME, "m")
+                    .put(Backend.SYSTEM_TX_LOG_NAME, "l")
+                    .build();
+
+    private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4);
+
+    static {
+        // Verify that shortCfNameMap is injective
+        // Should be guaranteed by Guava BiMap, but it doesn't hurt to check
+        Preconditions.checkArgument(null != SHORT_CF_NAME_MAP);
+        Collection<String> shorts = SHORT_CF_NAME_MAP.values();
+        Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size());
+    }
+
+    // Immutable instance fields
+    private final String tableName;
+    private final String compression;
+    private final int regionCount;
+    private final int regionsPerServer;
+    private final ConnectionMask cnx;
+    private final org.apache.hadoop.conf.Configuration hconf;
+    private final boolean shortCfNames;
+    private final boolean skipSchemaCheck;
+    private final String compatClass;
+    private final HBaseCompat compat;
+
+    private static final ConcurrentHashMap<HBaseStoreManager, Throwable> openManagers =
+            new ConcurrentHashMap<HBaseStoreManager, Throwable>();
+
+    // Mutable instance state
+    private final ConcurrentMap<String, HBaseKeyColumnValueStore> openStores;
+
+    private LocalLockMediator<StoreTransaction> llm;
+
+    public HBaseStoreManager(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) throws BackendException {
+        super(config, PORT_DEFAULT);
+
+        checkConfigDeprecation(config);
+
+        this.tableName = config.get(HBASE_TABLE);
+        this.compression = config.get(COMPRESSION);
+        this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1;
+        this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1;
+        this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK);
+        this.compatClass = config.has(COMPAT_CLASS) ? config.get(COMPAT_CLASS) : null;
+        this.compat = HBaseCompatLoader.getCompat(compatClass);
+
+        /*
+         * Specifying both region count options is permitted but may be
+         * indicative of a misunderstanding, so issue a warning.
+         */
+        if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) {
+            logger.warn("Both {} and {} are set in Titan's configuration, but "
+                      + "the former takes precedence and the latter will be ignored.",
+                        REGION_COUNT, REGIONS_PER_SERVER);
+        }
+
+        /* This static factory calls HBaseConfiguration.addHbaseResources(),
+         * which in turn applies the contents of hbase-default.xml and then
+         * applies the contents of hbase-site.xml.
+         */
+        this.hconf = HBaseConfiguration.create();
+
+        // Copy a subset of our commons config into a Hadoop config
+        int keysLoaded=0;
+        Map<String,Object> configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE);
+        for (Map.Entry<String,Object> entry : configSub.entrySet()) {
+            logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue());
+            if (entry.getValue()==null) continue;
+            hconf.set(entry.getKey(), entry.getValue().toString());
+            keysLoaded++;
+        }
+
+        // Special case for STORAGE_HOSTS
+        if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) {
+            String zkQuorumKey = "hbase.zookeeper.quorum";
+            String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS));
+            hconf.set(zkQuorumKey, csHostList);
+            logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList);
+        }
+
+        logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded);
+
+        this.shortCfNames = config.get(SHORT_CF_NAMES);
+
+        try {
+            //this.cnx = HConnectionManager.createConnection(hconf);
+            this.cnx = compat.createConnection(hconf);
+        } catch (IOException e) {
+            throw new PermanentBackendException(e);
+        }
+
+        if (logger.isTraceEnabled()) {
+            openManagers.put(this, new Throwable("Manager Opened"));
+            dumpOpenManagers();
+        }
+
+        logger.debug("Dumping HBase config key=value pairs");
+        for (Map.Entry<String, String> entry : hconf) {
+            logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue());
+        }
+        logger.debug("End of HBase config key=value pairs");
+
+        openStores = new ConcurrentHashMap<String, HBaseKeyColumnValueStore>();
+    }
+
+    @Override
+    public Deployment getDeployment() {
+        List<KeyRange> local;
+        try {
+            local = getLocalKeyPartition();
+            return null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE;
+        } catch (BackendException e) {
+            // propagating StorageException might be a better approach
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "hbase[" + tableName + "@" + super.toString() + "]";
+    }
+
+    public void dumpOpenManagers() {
+        int estimatedSize = openManagers.size();
+        logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize);
+        for (HBaseStoreManager m : openManagers.keySet()) {
+            logger.trace("Manager {} opened at:", m, openManagers.get(m));
+        }
+        logger.trace("----   End open HBase store manager list ({} managers)  ----", estimatedSize);
+    }
+
+    @Override
+    public void close() {
+        openStores.clear();
+        if (logger.isTraceEnabled())
+            openManagers.remove(this);
+        IOUtils.closeQuietly(cnx);
+    }
+
+    @Override
+    public StoreFeatures getFeatures() {
+
+        Configuration c = GraphDatabaseConfiguration.buildConfiguration();
+
+        StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder()
+                .orderedScan(true).unorderedScan(true).batchMutation(true)
+                .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true)
+                .timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS)
+                .locking(true)
+                .keyConsistent(c);
+
+        try {
+            fb.localKeyPartition(getDeployment() == Deployment.LOCAL);
+        } catch (Exception e) {
+            logger.warn("Unexpected exception during getDeployment()", e);
+        }
+
+        return fb.build();
+    }
+
+    @Override
+    public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
+        logger.debug("Enter mutateMany");
+        final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
+        // In case of an addition and deletion with identical timestamps, the
+        // deletion tombstone wins.
+        // http://hbase.apache.org/book/versions.html#d244e4250
+        Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey =
+                convertToCommands(
+                        mutations,
+                        commitTime.getAdditionTime(times.getUnit()),
+                        commitTime.getDeletionTime(times.getUnit()));
+
+        List<Row> batch = new ArrayList<Row>(commandsPerKey.size()); // actual batch operation
+
+        // convert sorted commands into representation required for 'batch' operation
+        for (Pair<Put, Delete> commands : commandsPerKey.values()) {
+            if (commands.getFirst() != null)
+                batch.add(commands.getFirst());
+
+            if (commands.getSecond() != null)
+                batch.add(commands.getSecond());
+        }
+
+        try {
+            TableMask table = null;
+
+            try {
+                table = cnx.getTable(tableName);
+                logger.debug("mutateMany : batch mutate started size {} ", batch.size());
+                table.batch(batch, new Object[batch.size()]);
+                logger.debug("mutateMany : batch mutate finished {} ", batch.size());
+            } finally {
+                IOUtils.closeQuietly(table);
+            }
+        } catch (IOException e) {
+            throw new TemporaryBackendException(e);
+        } catch (InterruptedException e) {
+            throw new TemporaryBackendException(e);
+        }
+
+        sleepAfterWrite(txh, commitTime);
+    }
+
+    @Override
+    public KeyColumnValueStore openDatabase(String longName) throws BackendException {
+
+        return openDatabase(longName, -1);
+    }
+
+    @Override
+    public KeyColumnValueStore openDatabase(final String longName, int ttlInSeconds) throws BackendException {
+
+        HBaseKeyColumnValueStore store = openStores.get(longName);
+
+        if (store == null) {
+            final String cfName = shortCfNames ? shortenCfName(longName) : longName;
+
+            final String llmPrefix = getName();
+            llm = LocalLockMediators.INSTANCE.<StoreTransaction>get(llmPrefix, times);
+            HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName, llm);
+
+            store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread
+
+            if (store == null) {
+                if (!skipSchemaCheck)
+                    ensureColumnFamilyExists(tableName, cfName, ttlInSeconds);
+
+                store = newStore;
+            }
+            logger.info("Loaded 1.x Hbase Client Store Manager");
+        }
+
+        return store;
+    }
+
+
+    @Override
+    public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException {
+        return new HBaseTransaction(config, llm);
+    }
+
+    @Override
+    public String getName() {
+        return tableName;
+    }
+
+    /**
+     * Deletes the specified table with all its columns.
+     * ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss.
+     */
+    @Override
+    public void clearStorage() throws BackendException {
+        try (AdminMask adm = getAdminInterface()) {
+            adm.clearTable(tableName, times.getTime().getNativeTimestamp());
+        } catch (IOException e)
+        {
+            throw new TemporaryBackendException(e);
+        }
+    }
+
+    @Override
+    public List<KeyRange> getLocalKeyPartition() throws BackendException {
+
+        List<KeyRange> result = new LinkedList<KeyRange>();
+
+        HTable table = null;
+        try {
+            ensureTableExists(tableName, getCfNameForStoreName(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME), 0);
+
+            table = new HTable(hconf, tableName);
+
+            Map<KeyRange, ServerName> normed =
+                    normalizeKeyBounds(table.getRegionLocations());
+
+            for (Map.Entry<KeyRange, ServerName> e : normed.entrySet()) {
+                if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) {
+                    result.add(e.getKey());
+                    logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue());
+                } else {
+                    logger.debug("Discarding remote {}", e.getValue());
+                }
+            }
+        } catch (MasterNotRunningException e) {
+            logger.warn("Unexpected MasterNotRunningException", e);
+        } catch (ZooKeeperConnectionException e) {
+            logger.warn("Unexpected ZooKeeperConnectionException", e);
+        } catch (IOException e) {
+            logger.warn("Unexpected IOException", e);
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+        return result;
+    }
+
+    /**
+     * Given a map produced by {@link HTable#getRegionLocations()}, transform
+     * each key from an {@link HRegionInfo} to a {@link KeyRange} expressing the
+     * region's start and end key bounds using Titan-partitioning-friendly
+     * conventions (start inclusive, end exclusive, zero bytes appended where
+     * necessary to make all keys at least 4 bytes long).
+     * <p/>
+     * This method iterates over the entries in its map parameter and performs
+     * the following conditional conversions on its keys. "Require" below means
+     * either a {@link Preconditions} invocation or an assertion. HRegionInfo
+     * sometimes returns start and end keys of zero length; this method replaces
+     * zero length keys with null before doing any of the checks described
+     * below. The parameter map and the values it contains are only read and
+     * never modified.
+     *
+     * <ul>
+     * <li>If an entry's HRegionInfo has null start and end keys, then first
+     * require that the parameter map is a singleton, and then return a
+     * single-entry map whose {@code KeyRange} has start and end buffers that
+     * are both four bytes of zeros.</li>
+     * <li>If the entry has a null end key (but non-null start key), put an
+     * equivalent entry in the result map with a start key identical to the
+     * input, except that zeros are appended to values less than 4 bytes long,
+     * and an end key that is four bytes of zeros.
+     * <li>If the entry has a null start key (but non-null end key), put an
+     * equivalent entry in the result map where the start key is four bytes of
+     * zeros, and the end key has zeros appended, if necessary, to make it at
+     * least 4 bytes long, after which one is added to the padded value in
+     * unsigned 32-bit arithmetic with overflow allowed.</li>
+     * <li>Any entry which matches none of the above criteria results in an
+     * equivalent entry in the returned map, except that zeros are appended to
+     * both keys to make each at least 4 bytes long, and the end key is then
+     * incremented as described in the last bullet point.</li>
+     * </ul>
+     *
+     * After iterating over the parameter map, this method checks that it either
+     * saw no entries with null keys, one entry with a null start key and a
+     * different entry with a null end key, or one entry with both start and end
+     * keys null. If any null keys are observed besides these three cases, the
+     * method will die with a precondition failure.
+     *
+     * @param raw
+     *            A map of HRegionInfo and ServerName from HBase
+     * @return Titan-friendly expression of each region's rowkey boundaries
+     */
+    private Map<KeyRange, ServerName> normalizeKeyBounds(NavigableMap<HRegionInfo, ServerName> raw) {
+
+        Map.Entry<HRegionInfo, ServerName> nullStart = null;
+        Map.Entry<HRegionInfo, ServerName> nullEnd = null;
+
+        ImmutableMap.Builder<KeyRange, ServerName> b = ImmutableMap.builder();
+
+        for (Map.Entry<HRegionInfo, ServerName> e : raw.entrySet()) {
+            HRegionInfo regionInfo = e.getKey();
+            byte startKey[] = regionInfo.getStartKey();
+            byte endKey[]   = regionInfo.getEndKey();
+
+            if (0 == startKey.length) {
+                startKey = null;
+                logger.trace("Converted zero-length HBase startKey byte array to null");
+            }
+
+            if (0 == endKey.length) {
+                endKey = null;
+                logger.trace("Converted zero-length HBase endKey byte array to null");
+            }
+
+            if (null == startKey && null == endKey) {
+                Preconditions.checkState(1 == raw.size());
+                logger.debug("HBase table {} has a single region {}", tableName, regionInfo);
+                // Choose arbitrary shared value = startKey = endKey
+                return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), e.getValue()).build();
+            } else if (null == startKey) {
+                logger.debug("Found HRegionInfo with null startKey on server {}: {}", e.getValue(), regionInfo);
+                Preconditions.checkState(null == nullStart);
+                nullStart = e;
+                // I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive
+                StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
+                // Replace null start key with zeroes
+                b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), e.getValue());
+            } else if (null == endKey) {
+                logger.debug("Found HRegionInfo with null endKey on server {}: {}", e.getValue(), regionInfo);
+                Preconditions.checkState(null == nullEnd);
+                nullEnd = e;
+                // Replace null end key with zeroes
+                b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), e.getValue());
+            } else {
+                Preconditions.checkState(null != startKey);
+                Preconditions.checkState(null != endKey);
+
+                // Convert HBase's inclusive end keys into exclusive Titan end keys
+                StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey));
+                StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
+
+                KeyRange kr = new KeyRange(startBuf, endBuf);
+                b.put(kr, e.getValue());
+                logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", e.getValue(), regionInfo);
+            }
+        }
+
+        // Require either no null key bounds or a pair of them
+        Preconditions.checkState(!(null == nullStart ^ null == nullEnd));
+
+        // Check that every key in the result is at least 4 bytes long
+        Map<KeyRange, ServerName> result = b.build();
+        for (KeyRange kr : result.keySet()) {
+            Preconditions.checkState(4 <= kr.getStart().length());
+            Preconditions.checkState(4 <= kr.getEnd().length());
+        }
+
+        return result;
+    }
+
+    /**
+     * If the parameter is shorter than 4 bytes, then create and return a new 4
+     * byte array with the input array's bytes followed by zero bytes. Otherwise
+     * return the parameter.
+     *
+     * @param dataToPad non-null but possibly zero-length byte array
+     * @return either the parameter or a new array
+     */
+    private final byte[] zeroExtend(byte[] dataToPad) {
+        assert null != dataToPad;
+
+        final int targetLength = 4;
+
+        if (targetLength <= dataToPad.length)
+            return dataToPad;
+
+        byte padded[] = new byte[targetLength];
+
+        for (int i = 0; i < dataToPad.length; i++)
+            padded[i] = dataToPad[i];
+
+        for (int i = dataToPad.length; i < padded.length; i++)
+            padded[i] = (byte)0;
+
+        return padded;
+    }
+
+    public static String shortenCfName(String longName) throws PermanentBackendException {
+        final String s;
+        if (SHORT_CF_NAME_MAP.containsKey(longName)) {
+            s = SHORT_CF_NAME_MAP.get(longName);
+            Preconditions.checkNotNull(s);
+            logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s);
+        } else {
+            if (SHORT_CF_NAME_MAP.containsValue(longName)) {
+                String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true";
+                String msg = String.format(fmt, SHORT_CF_NAME_MAP.inverse().get(longName), longName, SHORT_CF_NAMES.getName());
+                throw new PermanentBackendException(msg);
+            }
+            s = longName;
+            logger.debug("Kept default CF name \"{}\" because it has no associated short form", s);
+        }
+        return s;
+    }
+
+    private HTableDescriptor ensureTableExists(String tableName, String initialCFName, int ttlInSeconds) throws BackendException {
+        AdminMask adm = null;
+
+        HTableDescriptor desc;
+
+        try { // Create our table, if necessary
+            adm = getAdminInterface();
+            /*
+             * Some HBase versions/impls respond badly to attempts to create a
+             * table without at least one CF. See #661. Creating a CF along with
+             * the table avoids HBase carping.
+             */
+            if (adm.tableExists(tableName)) {
+                desc = adm.getTableDescriptor(tableName);
+            } else {
+                desc = createTable(tableName, initialCFName, ttlInSeconds, adm);
+            }
+        } catch (IOException e) {
+            throw new TemporaryBackendException(e);
+        } finally {
+            IOUtils.closeQuietly(adm);
+        }
+
+        return desc;
+    }
+
+    private HTableDescriptor createTable(String tableName, String cfName, int ttlInSeconds, AdminMask adm) throws IOException {
+        HTableDescriptor desc = compat.newTableDescriptor(tableName);
+
+        HColumnDescriptor cdesc = new HColumnDescriptor(cfName);
+        setCFOptions(cdesc, ttlInSeconds);
+
+        compat.addColumnFamilyToTableDescriptor(desc, cdesc);
+
+        int count; // total regions to create
+        String src;
+
+        if (MIN_REGION_COUNT <= (count = regionCount)) {
+            src = "region count configuration";
+        } else if (0 < regionsPerServer &&
+                   MIN_REGION_COUNT <= (count = regionsPerServer * adm.getEstimatedRegionServerCount())) {
+            src = "ClusterStatus server count";
+        } else {
+            count = -1;
+            src = "default";
+        }
+
+        if (MIN_REGION_COUNT < count) {
+            adm.createTable(desc, getStartKey(count), getEndKey(count), count);
+            logger.debug("Created table {} with region count {} from {}", tableName, count, src);
+        } else {
+            adm.createTable(desc);
+            logger.debug("Created table {} with default start key, end key, and region count", tableName);
+        }
+
+        return desc;
+    }
+
+    /**
+     * This method generates the second argument to
+     * {@link HBaseAdmin#createTable(HTableDescriptor, byte[], byte[], int)}.
+     * <p/>
+     * From the {@code createTable} javadoc:
+     * "The start key specified will become the end key of the first region of
+     * the table, and the end key specified will become the start key of the
+     * last region of the table (the first region has a null start key and
+     * the last region has a null end key)"
+     * <p/>
+     * To summarize, the {@code createTable} argument called "startKey" is
+     * actually the end key of the first region.
+     */
+    private byte[] getStartKey(int regionCount) {
+        ByteBuffer regionWidth = ByteBuffer.allocate(4);
+        regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip();
+        return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
+    }
+
+    /**
+     * Companion to {@link #getStartKey(int)}. See its javadoc for details.
+     */
+    private byte[] getEndKey(int regionCount) {
+        ByteBuffer regionWidth = ByteBuffer.allocate(4);
+        regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip();
+        return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
+    }
+
+    private void ensureColumnFamilyExists(String tableName, String columnFamily, int ttlInSeconds) throws BackendException {
+        AdminMask adm = null;
+        try {
+            adm = getAdminInterface();
+            HTableDescriptor desc = ensureTableExists(tableName, columnFamily, ttlInSeconds);
+
+            Preconditions.checkNotNull(desc);
+
+            HColumnDescriptor cf = desc.getFamily(columnFamily.getBytes());
+
+            // Create our column family, if necessary
+            if (cf == null) {
+                try {
+                    if (!adm.isTableDisabled(tableName)) {
+                        adm.disableTable(tableName);
+                    }
+                } catch (TableNotEnabledException e) {
+                    logger.debug("Table {} already disabled", tableName);
+                } catch (IOException e) {
+                    throw new TemporaryBackendException(e);
+                }
+
+                try {
+                    HColumnDescriptor cdesc = new HColumnDescriptor(columnFamily);
+
+                    setCFOptions(cdesc, ttlInSeconds);
+
+                    adm.addColumn(tableName, cdesc);
+
+                    logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily);
+
+                    adm.enableTable(tableName);
+                } catch (TableNotFoundException ee) {
+                    logger.error("TableNotFoundException", ee);
+                    throw new PermanentBackendException(ee);
+                } catch (org.apache.hadoop.hbase.TableExistsException ee) {
+                    logger.debug("Swallowing exception {}", ee);
+                } catch (IOException ee) {
+                    throw new TemporaryBackendException(ee);
+                }
+            }
+        } finally {
+            IOUtils.closeQuietly(adm);
+        }
+    }
+
+    private void setCFOptions(HColumnDescriptor cdesc, int ttlInSeconds) {
+        if (null != compression && !compression.equals(COMPRESSION_DEFAULT))
+            compat.setCompression(cdesc, compression);
+
+        if (ttlInSeconds > 0)
+            cdesc.setTimeToLive(ttlInSeconds);
+
+        cdesc.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+    }
+
+    /**
+     * Convert Titan internal Mutation representation into HBase native commands.
+     *
+     * @param mutations    Mutations to convert into HBase commands.
+     * @param putTimestamp The timestamp to use for Put commands.
+     * @param delTimestamp The timestamp to use for Delete commands.
+     * @return Commands sorted by key converted from Titan internal representation.
+     * @throws com.thinkaurelius.titan.diskstorage.PermanentBackendException
+     */
+    private Map<StaticBuffer, Pair<Put, Delete>> convertToCommands(Map<String, Map<StaticBuffer, KCVMutation>> mutations,
+                                                                   final long putTimestamp,
+                                                                   final long delTimestamp) throws PermanentBackendException {
+        Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey = new HashMap<StaticBuffer, Pair<Put, Delete>>();
+
+        for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> entry : mutations.entrySet()) {
+
+            String cfString = getCfNameForStoreName(entry.getKey());
+            byte[] cfName = cfString.getBytes();
+
+            for (Map.Entry<StaticBuffer, KCVMutation> m : entry.getValue().entrySet()) {
+                byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY);
+                KCVMutation mutation = m.getValue();
+
+                Pair<Put, Delete> commands = commandsPerKey.get(m.getKey());
+
+                if (commands == null) {
+                    commands = new Pair<Put, Delete>();
+                    commandsPerKey.put(m.getKey(), commands);
+                }
+
+                if (mutation.hasDeletions()) {
+                    if (commands.getSecond() == null) {
+                        Delete d = new Delete(key);
+                        compat.setTimestamp(d, delTimestamp);
+                        commands.setSecond(d);
+                    }
+
+                    for (StaticBuffer b : mutation.getDeletions()) {
+                        commands.getSecond().deleteColumns(cfName, b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp);
+                    }
+                }
+
+                if (mutation.hasAdditions()) {
+                    if (commands.getFirst() == null) {
+                        Put p = new Put(key, putTimestamp);
+                        commands.setFirst(p);
+                    }
+
+                    for (Entry e : mutation.getAdditions()) {
+                        commands.getFirst().add(cfName,
+                                e.getColumnAs(StaticBuffer.ARRAY_FACTORY),
+                                putTimestamp,
+                                e.getValueAs(StaticBuffer.ARRAY_FACTORY));
+                    }
+                }
+            }
+        }
+
+        return commandsPerKey;
+    }
+
+    private String getCfNameForStoreName(String storeName) throws PermanentBackendException {
+        return shortCfNames ? shortenCfName(storeName) : storeName;
+    }
+
+    private void checkConfigDeprecation(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) {
+        if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) {
+            logger.warn("The configuration property {} is ignored for HBase. Set hbase.zookeeper.property.clientPort in hbase-site.xml or {}.hbase.zookeeper.property.clientPort in Titan's configuration file.",
+                    ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), ConfigElement.getPath(HBASE_CONFIGURATION_NAMESPACE));
+        }
+    }
+
+    private AdminMask getAdminInterface() {
+        try {
+            return cnx.getAdmin();
+        } catch (IOException e) {
+            throw new TitanException(e);
+        }
+    }
+
+    /**
+     * Similar to {@link Function}, except that the {@code apply} method is allowed
+     * to throw {@link BackendException}.
+     */
+    private static interface BackendFunction<F, T> {
+
+        T apply(F input) throws BackendException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
new file mode 100644
index 0000000..e13593f
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * This class overrides and adds nothing compared with
+ * {@link com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific
+ * to HBase, which lets us check for user errors like passing a Cassandra
+ * transaction into a HBase method.
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+public class HBaseTransaction extends AbstractStoreTransaction {
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseTransaction.class);
+
+    LocalLockMediator<StoreTransaction> llm;
+
+    Set<KeyColumn> keyColumnLocks = new LinkedHashSet<>();
+
+    public HBaseTransaction(final BaseTransactionConfig config, LocalLockMediator<StoreTransaction> llm) {
+        super(config);
+        this.llm = llm;
+    }
+
+    @Override
+    public synchronized void rollback() throws BackendException {
+        super.rollback();
+        log.debug("Rolled back transaction");
+        deleteAllLocks();
+    }
+
+    @Override
+    public synchronized void commit() throws BackendException {
+        super.commit();
+        log.debug("Committed transaction");
+        deleteAllLocks();
+    }
+
+    public void updateLocks(KeyColumn lockID, StaticBuffer expectedValue) {
+        keyColumnLocks.add(lockID);
+    }
+
+    private void deleteAllLocks() {
+        for(KeyColumn kc : keyColumnLocks) {
+            log.debug("Removed lock {} ", kc);
+            llm.unlock(kc, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
new file mode 100644
index 0000000..8660644
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+
+public class HConnection0_98 implements ConnectionMask
+{
+
+    private final HConnection cnx;
+
+    public HConnection0_98(HConnection cnx)
+    {
+        this.cnx = cnx;
+    }
+
+    @Override
+    public TableMask getTable(String name) throws IOException
+    {
+        return new HTable0_98(cnx.getTable(name));
+    }
+
+    @Override
+    public AdminMask getAdmin() throws IOException
+    {
+        return new HBaseAdmin0_98(new HBaseAdmin(cnx));
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        cnx.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
new file mode 100644
index 0000000..91e5026
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+public class HConnection1_0 implements ConnectionMask
+{
+
+    private final Connection cnx;
+
+    public HConnection1_0(Connection cnx)
+    {
+        this.cnx = cnx;
+    }
+
+    @Override
+    public TableMask getTable(String name) throws IOException
+    {
+        return new HTable1_0(cnx.getTable(TableName.valueOf(name)));
+    }
+
+    @Override
+    public AdminMask getAdmin() throws IOException
+    {
+        return new HBaseAdmin1_0(new HBaseAdmin(cnx));
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        cnx.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
new file mode 100644
index 0000000..4ddb2f0
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Scan;
+
+public class HTable0_98 implements TableMask
+{
+    private final HTableInterface table;
+
+    public HTable0_98(HTableInterface table)
+    {
+        this.table = table;
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan filter) throws IOException
+    {
+        return table.getScanner(filter);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException
+    {
+        return table.get(gets);
+    }
+
+    @Override
+    public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
+    {
+        table.batch(writes, results);
+        table.flushCommits();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        table.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
new file mode 100644
index 0000000..5085abb
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+public class HTable1_0 implements TableMask
+{
+    private final Table table;
+
+    public HTable1_0(Table table)
+    {
+        this.table = table;
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan filter) throws IOException
+    {
+        return table.getScanner(filter);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException
+    {
+        return table.get(gets);
+    }
+
+    @Override
+    public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
+    {
+        table.batch(writes, results);
+        /* table.flushCommits(); not needed anymore */
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        table.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
new file mode 100644
index 0000000..dd3d61e
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface TableMask extends Closeable
+{
+
+    ResultScanner getScanner(Scan filter) throws IOException;
+
+    Result[] get(List<Get> gets) throws IOException;
+
+    void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java
new file mode 100644
index 0000000..20c59e1
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.locking;
+
+import com.google.common.base.Preconditions;
+import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
+import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
+import com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class resolves lock contention between two transactions on the same JVM.
+ * <p/>
+ * This is not just an optimization to reduce network traffic. Locks written by
+ * Titan to a distributed key-value store contain an identifier, the "Rid",
+ * which is unique only to the process level. The Rid can't tell which
+ * transaction in a process holds any given lock. This class prevents two
+ * transactions in a single process from concurrently writing the same lock to a
+ * distributed key-value store.
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+
+public class LocalLockMediator<T> {
+
+    private static final Logger log = LoggerFactory
+        .getLogger(LocalLockMediator.class);
+
+    /**
+     * Namespace for which this mediator is responsible
+     *
+     * @see LocalLockMediatorProvider
+     */
+    private final String name;
+
+    private final TimestampProvider times;
+
+    private DelayQueue<ExpirableKeyColumn> expiryQueue = new DelayQueue<>();
+
+    private ExecutorService lockCleanerService = Executors.newFixedThreadPool(1, new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable runnable) {
+            Thread thread = Executors.defaultThreadFactory().newThread(runnable);
+            thread.setDaemon(true);
+            return thread;
+        }
+    });
+
+
+
+    /**
+     * Maps a ({@code key}, {@code column}) pair to the local transaction
+     * holding a lock on that pair. Values in this map may have already expired
+     * according to {@link AuditRecord#expires}, in which case the lock should
+     * be considered invalid.
+     */
+    private final ConcurrentHashMap<KeyColumn, AuditRecord<T>> locks = new ConcurrentHashMap<KeyColumn, AuditRecord<T>>();
+
+    public LocalLockMediator(String name, TimestampProvider times) {
+        this.name = name;
+        this.times = times;
+
+        Preconditions.checkNotNull(name);
+        Preconditions.checkNotNull(times);
+        lockCleanerService.submit(new LockCleaner());
+    }
+
+    /**
+     * Acquire the lock specified by {@code kc}.
+     * <p/>
+     * <p/>
+     * For any particular key-column, whatever value of {@code requestor} is
+     * passed to this method must also be passed to the associated later call to
+     * {@link #unlock(KeyColumn, ExpectedValueCheckingTransaction)}.
+     * <p/>
+     * If some requestor {@code r} calls this method on a KeyColumn {@code k}
+     * and this method returns true, then subsequent calls to this method by
+     * {@code r} on {@code l} merely attempt to update the {@code expiresAt}
+     * timestamp. This differs from typical lock reentrance: multiple successful
+     * calls to this method do not require an equal number of calls to
+     * {@code #unlock()}. One {@code #unlock()} call is enough, no matter how
+     * many times a {@code requestor} called {@code lock} beforehand. Note that
+     * updating the timestamp may fail, in which case the lock is considered to
+     * have expired and the calling context should assume it no longer holds the
+     * lock specified by {@code kc}.
+     * <p/>
+     * The number of nanoseconds elapsed since the UNIX Epoch is not readily
+     * available within the JVM. When reckoning expiration times, this method
+     * uses the approximation implemented by
+     * {@link com.thinkaurelius.titan.diskstorage.util.NanoTime#getApproxNSSinceEpoch(false)}.
+     * <p/>
+     * The current implementation of this method returns true when given an
+     * {@code expiresAt} argument in the past. Future implementations may return
+     * false instead.
+     *
+     * @param kc        lock identifier
+     * @param requestor the object locking {@code kc}
+     * @param expires   instant at which this lock will automatically expire
+     * @return true if the lock is acquired, false if it was not acquired
+     */
+    public boolean lock(KeyColumn kc, T requestor, Timepoint expires) {
+        assert null != kc;
+        assert null != requestor;
+
+        AuditRecord<T> audit = new AuditRecord<T>(requestor, expires);
+        AuditRecord<T> inmap = locks.putIfAbsent(kc, audit);
+
+        boolean success = false;
+
+        if (null == inmap) {
+            // Uncontended lock succeeded
+            if (log.isTraceEnabled()) {
+                log.trace("New local lock created: {} namespace={} txn={}",
+                    new Object[]{kc, name, requestor});
+            }
+            success = true;
+        } else if (inmap.equals(audit)) {
+            // requestor has already locked kc; update expiresAt
+            success = locks.replace(kc, inmap, audit);
+            if (log.isTraceEnabled()) {
+                if (success) {
+                    log.trace(
+                        "Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
+                        new Object[]{kc, name, requestor, inmap.expires,
+                            audit.expires});
+                } else {
+                    log.trace(
+                        "Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
+                        new Object[]{kc, name, requestor, inmap.expires,
+                            audit.expires});
+                }
+            }
+        } else if (0 > inmap.expires.compareTo(times.getTime())) {
+            // the recorded lock has expired; replace it
+            success = locks.replace(kc, inmap, audit);
+            if (log.isTraceEnabled()) {
+                log.trace(
+                    "Discarding expired lock: {} namespace={} txn={} expired={}",
+                    new Object[]{kc, name, inmap.holder, inmap.expires});
+            }
+        } else {
+            // we lost to a valid lock
+            if (log.isTraceEnabled()) {
+                log.trace(
+                    "Local lock failed: {} namespace={} txn={} (already owned by {})",
+                    new Object[]{kc, name, requestor, inmap});
+            }
+        }
+
+        if (success) {
+            expiryQueue.add(new ExpirableKeyColumn(kc, expires));
+        }
+        return success;
+    }
+
+    /**
+     * Release the lock specified by {@code kc} and which was previously
+     * locked by {@code requestor}, if it is possible to release it.
+     *
+     * @param kc        lock identifier
+     * @param requestor the object which previously locked {@code kc}
+     */
+    public boolean unlock(KeyColumn kc, T requestor) {
+
+        if (!locks.containsKey(kc)) {
+            log.info("Local unlock failed: no locks found for {}", kc);
+            return false;
+        }
+
+        AuditRecord<T> unlocker = new AuditRecord<T>(requestor, null);
+
+        AuditRecord<T> holder = locks.get(kc);
+
+        if (!holder.equals(unlocker)) {
+            log.error("Local unlock of {} by {} failed: it is held by {}",
+                new Object[]{kc, unlocker, holder});
+            return false;
+        }
+
+        boolean removed = locks.remove(kc, unlocker);
+
+        if (removed) {
+            expiryQueue.remove(kc);
+            if (log.isTraceEnabled()) {
+                log.trace("Local unlock succeeded: {} namespace={} txn={}",
+                    new Object[]{kc, name, requestor});
+            }
+        } else {
+            log.warn("Local unlock warning: lock record for {} disappeared "
+                + "during removal; this suggests the lock either expired "
+                + "while we were removing it, or that it was erroneously "
+                + "unlocked multiple times.", kc);
+        }
+
+        // Even if !removed, we're finished unlocking, so return true
+        return true;
+    }
+
+    public String toString() {
+        return "LocalLockMediator [" + name + ",  ~" + locks.size()
+            + " current locks]";
+    }
+
+    /**
+     * A record containing the local transaction that holds a lock and the
+     * lock's expiration time.
+     */
+    private static class AuditRecord<T> {
+
+        /**
+         * The local transaction that holds/held the lock.
+         */
+        private final T holder;
+        /**
+         * The expiration time of a the lock.
+         */
+        private final Timepoint expires;
+        /**
+         * Cached hashCode.
+         */
+        private int hashCode;
+
+        private AuditRecord(T holder, Timepoint expires) {
+            this.holder = holder;
+            this.expires = expires;
+        }
+
+        /**
+         * This implementation depends only on the lock holder and not on the
+         * lock expiration time.
+         */
+        @Override
+        public int hashCode() {
+            if (0 == hashCode)
+                hashCode = holder.hashCode();
+
+            return hashCode;
+        }
+
+        /**
+         * This implementation depends only on the lock holder and not on the
+         * lock expiration time.
+         */
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            /*
+             * This warning suppression is harmless because we are only going to
+             * call other.holder.equals(...), and since equals(...) is part of
+             * Object, it is guaranteed to be defined no matter the concrete
+             * type of parameter T.
+             */
+            @SuppressWarnings("rawtypes")
+            AuditRecord other = (AuditRecord) obj;
+            if (holder == null) {
+                if (other.holder != null)
+                    return false;
+            } else if (!holder.equals(other.holder))
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "AuditRecord [txn=" + holder + ", expires=" + expires + "]";
+        }
+
+    }
+
+    private class LockCleaner implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    log.debug("Lock Cleaner service started");
+                    ExpirableKeyColumn lock = expiryQueue.take();
+                    log.debug("Expiring key column " + lock.getKeyColumn());
+                    locks.remove(lock.getKeyColumn());
+                }
+            } catch (InterruptedException e) {
+                log.debug("Received interrupt. Exiting");
+            }
+        }
+    }
+
+    private static class ExpirableKeyColumn implements Delayed {
+
+        private Timepoint expiryTime;
+        private KeyColumn kc;
+
+        public ExpirableKeyColumn(KeyColumn keyColumn, Timepoint expiryTime) {
+            this.kc = keyColumn;
+            this.expiryTime = expiryTime;
+        }
+
+        @Override
+        public long getDelay(TimeUnit unit) {
+            return expiryTime.getTimestamp(TimeUnit.NANOSECONDS);
+        }
+
+        @Override
+        public int compareTo(Delayed o) {
+            if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) < ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) {
+                return -1;
+            }
+            if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) > ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) {
+                return 1;
+            }
+            return 0;
+        }
+
+        public KeyColumn getKeyColumn() {
+            return kc;
+        }
+    }
+}


Mime
View raw message