Return-Path: X-Original-To: apmail-atlas-commits-archive@minotaur.apache.org Delivered-To: apmail-atlas-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 96BE018619 for ; Thu, 3 Dec 2015 05:46:25 +0000 (UTC) Received: (qmail 31705 invoked by uid 500); 3 Dec 2015 05:46:25 -0000 Delivered-To: apmail-atlas-commits-archive@atlas.apache.org Received: (qmail 31672 invoked by uid 500); 3 Dec 2015 05:46:25 -0000 Mailing-List: contact commits-help@atlas.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.incubator.apache.org Delivered-To: mailing list commits@atlas.incubator.apache.org Received: (qmail 31663 invoked by uid 99); 3 Dec 2015 05:46:25 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Dec 2015 05:46:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B0DA7CC381 for ; Thu, 3 Dec 2015 05:46:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 2pQ8GbVJKt3r for ; Thu, 3 Dec 2015 05:46:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 9868F47BCB for ; Thu, 3 Dec 2015 05:46:13 +0000 (UTC) Received: (qmail 31575 invoked by uid 99); 3 Dec 2015 05:46:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Dec 2015 05:46:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7384E67E0; Thu, 3 Dec 2015 05:46:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sumasai@apache.org To: commits@atlas.incubator.apache.org Date: Thu, 03 Dec 2015 05:46:13 -0000 Message-Id: <3cfd95add7eb495d8a34633cbd8b6c7f@git.apache.org> In-Reply-To: <9103f65cdad040eda75f65433612e13e@git.apache.org> References: <9103f65cdad040eda75f65433612e13e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] incubator-atlas git commit: ATLAS-352 Improve write performance on type and entity creation with Hbase(sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java new file mode 100644 index 0000000..b4dc12e --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java @@ -0,0 +1,397 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.thinkaurelius.titan.core.attribute.Duration; +import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; +import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException; +import com.thinkaurelius.titan.diskstorage.util.KeyColumn; +import com.thinkaurelius.titan.diskstorage.util.RecordIterator; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList; +import com.thinkaurelius.titan.diskstorage.util.time.Timepoint; +import com.thinkaurelius.titan.diskstorage.util.time.Timestamps; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.util.system.IOUtils; + +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.*; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static com.thinkaurelius.titan.diskstorage.EntryMetaData.*; + +/** + * Here are some areas that might need work: + *

+ * - batching? (consider HTable#batch, HTable#setAutoFlush(false) + * - tuning HTable#setWriteBufferSize (?) + * - writing a server-side filter to replace ColumnCountGetFilter, which drops + * all columns on the row where it reaches its limit. This requires getSlice, + * currently, to impose its limit on the client side. That obviously won't + * scale. + * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this) + * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache + *

+ * There may be other problem areas. These are just the ones of which I'm aware. + */ +public class HBaseKeyColumnValueStore implements KeyColumnValueStore { + + private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class); + + private final String tableName; + private final HBaseStoreManager storeManager; + + // When using shortened CF names, columnFamily is the shortname and storeName is the longname + // When not using shortened CF names, they are the same + //private final String columnFamily; + private final String storeName; + // This is columnFamily.getBytes() + private final byte[] columnFamilyBytes; + private final HBaseGetter entryGetter; + + private final ConnectionMask cnx; + + private LocalLockMediator localLockMediator; + + private Duration lockExpiryTime; + + HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator llm) { + this.storeManager = storeManager; + this.cnx = cnx; + this.tableName = tableName; + //this.columnFamily = columnFamily; + this.storeName = storeName; + this.columnFamilyBytes = columnFamily.getBytes(); + this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName)); + this.localLockMediator = llm; + this.lockExpiryTime = storeManager.getStorageConfig().get(GraphDatabaseConfiguration.LOCK_EXPIRE); + } + + @Override + public void close() throws BackendException { + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { + Map result = getHelper(Arrays.asList(query.getKey()), getFilter(query)); + return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST); + } + + @Override + public Map getSlice(List keys, SliceQuery query, StoreTransaction txh) throws BackendException { + return getHelper(keys, getFilter(query)); + } + + @Override + public void mutate(StaticBuffer key, List additions, List deletions, StoreTransaction txh) throws BackendException { + Map mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions)); + mutateMany(mutations, txh); + } + + @Override + public void acquireLock(StaticBuffer key, + StaticBuffer column, + StaticBuffer expectedValue, + StoreTransaction txh) throws BackendException { + + KeyColumn lockID = new KeyColumn(key, column); + logger.debug("Attempting to acquireLock on {} ", lockID); + final Timepoint lockStartTime = Timestamps.NANO.getTime(System.nanoTime(), TimeUnit.NANOSECONDS); + boolean locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTime)); + if (!locked) { + throw new PermanentLockingException("Could not lock the keyColumn " + lockID + " on CF {} " + Bytes.toString(columnFamilyBytes)); + } + ((HBaseTransaction) txh).updateLocks(lockID, expectedValue); + } + + @Override + public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { + return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY), + query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY), + new FilterList(FilterList.Operator.MUST_PASS_ALL), + query); + } + + @Override + public String getName() { + return storeName; + } + + @Override + public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException { + return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query); + } + + public static Filter getFilter(SliceQuery query) { + byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null; + byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null; + + Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false); + + if (query.hasLimit()) { + filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, + filter, + new ColumnPaginationFilter(query.getLimit(), 0)); + } + + logger.debug("Generated HBase Filter {}", filter); + + return filter; + } + + private Map getHelper(List keys, Filter getFilter) throws BackendException { + List requests = new ArrayList(keys.size()); + { + for (StaticBuffer key : keys) { + Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter); + try { + g.setTimeRange(0, Long.MAX_VALUE); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + requests.add(g); + } + } + + Map resultMap = new HashMap(keys.size()); + + try { + TableMask table = null; + Result[] results = null; + + try { + table = cnx.getTable(tableName); + logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size()); + results = table.get(requests); + logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size()); + } finally { + IOUtils.closeQuietly(table); + } + + if (results == null) + return KCVSUtil.emptyResults(keys); + + assert results.length==keys.size(); + + for (int i = 0; i < results.length; i++) { + Result result = results[i]; + NavigableMap>> f = result.getMap(); + + if (f == null) { // no result for this key + resultMap.put(keys.get(i), EntryList.EMPTY_LIST); + continue; + } + + // actual key with + NavigableMap> r = f.get(columnFamilyBytes); + resultMap.put(keys.get(i), (r == null) + ? EntryList.EMPTY_LIST + : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter)); + } + + return resultMap; + } catch (IOException e) { + throw new TemporaryBackendException(e); + } + } + + private void mutateMany(Map mutations, StoreTransaction txh) throws BackendException { + storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh); + } + + private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException { + return executeKeySliceQuery(null, null, filters, columnSlice); + } + + private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey, + @Nullable byte[] endKey, + FilterList filters, + @Nullable SliceQuery columnSlice) throws BackendException { + Scan scan = new Scan().addFamily(columnFamilyBytes); + + try { + scan.setTimeRange(0, Long.MAX_VALUE); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + + if (startKey != null) + scan.setStartRow(startKey); + + if (endKey != null) + scan.setStopRow(endKey); + + if (columnSlice != null) { + filters.addFilter(getFilter(columnSlice)); + } + + TableMask table = null; + + logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey)); + try { + table = cnx.getTable(tableName); + return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes); + } catch (IOException e) { + IOUtils.closeQuietly(table); + throw new PermanentBackendException(e); + } + } + + private class RowIterator implements KeyIterator { + private final Closeable table; + private final Iterator rows; + private final byte[] columnFamilyBytes; + + private Result currentRow; + private boolean isClosed; + + public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) { + this.table = table; + this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length); + this.rows = Iterators.filter(rows.iterator(), new Predicate() { + @Override + public boolean apply(@Nullable Result result) { + if (result == null) + return false; + + try { + StaticBuffer id = StaticArrayBuffer.of(result.getRow()); + id.getLong(0); + } catch (NumberFormatException e) { + return false; + } + + return true; + } + }); + } + + @Override + public RecordIterator getEntries() { + ensureOpen(); + + return new RecordIterator() { + private final Iterator>> kv = currentRow.getMap().get(columnFamilyBytes).entrySet().iterator(); + + @Override + public boolean hasNext() { + ensureOpen(); + return kv.hasNext(); + } + + @Override + public Entry next() { + ensureOpen(); + return StaticArrayEntry.ofBytes(kv.next(), entryGetter); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public boolean hasNext() { + ensureOpen(); + return rows.hasNext(); + } + + @Override + public StaticBuffer next() { + ensureOpen(); + + currentRow = rows.next(); + return StaticArrayBuffer.of(currentRow.getRow()); + } + + @Override + public void close() { + IOUtils.closeQuietly(table); + isClosed = true; + logger.debug("RowIterator closed table {}", table); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private void ensureOpen() { + if (isClosed) + throw new IllegalStateException("Iterator has been closed."); + } + } + + private static class HBaseGetter implements StaticArrayEntry.GetColVal>, byte[]> { + + private final EntryMetaData[] schema; + + private HBaseGetter(EntryMetaData[] schema) { + this.schema = schema; + } + + @Override + public byte[] getColumn(Map.Entry> element) { + return element.getKey(); + } + + @Override + public byte[] getValue(Map.Entry> element) { + return element.getValue().lastEntry().getValue(); + } + + @Override + public EntryMetaData[] getMetaSchema(Map.Entry> element) { + return schema; + } + + @Override + public Object getMetaData(Map.Entry> element, EntryMetaData meta) { + switch(meta) { + case TIMESTAMP: + return element.getValue().lastEntry().getKey(); + default: + throw new UnsupportedOperationException("Unsupported meta data: " + meta); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java new file mode 100644 index 0000000..52f28af --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java @@ -0,0 +1,926 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.thinkaurelius.titan.diskstorage.Backend; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.CustomizeStoreKCVSManager; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediators; +import com.thinkaurelius.titan.diskstorage.util.time.Timestamps; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.thinkaurelius.titan.core.TitanException; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.Entry; +import com.thinkaurelius.titan.diskstorage.PermanentBackendException; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; +import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; +import com.thinkaurelius.titan.diskstorage.util.BufferUtil; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; +import com.thinkaurelius.titan.util.system.IOUtils; +import com.thinkaurelius.titan.util.system.NetworkUtil; + +/** + * Storage Manager for HBase + * + * @author Dan LaRocque + */ +@PreInitializeConfigOptions +public class HBaseStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager, CustomizeStoreKCVSManager { + + private static final Logger logger = LoggerFactory.getLogger(HBaseStoreManager.class); + + public static final ConfigNamespace HBASE_NS = + new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options"); + + public static final ConfigOption SHORT_CF_NAMES = + new ConfigOption(HBASE_NS, "short-cf-names", + "Whether to shorten the names of Titan's column families to one-character mnemonics " + + "to conserve storage space", ConfigOption.Type.FIXED, true); + + public static final String COMPRESSION_DEFAULT = "-DEFAULT-"; + + public static final ConfigOption COMPRESSION = + new ConfigOption(HBASE_NS, "compression-algorithm", + "An HBase Compression.Algorithm enum string which will be applied to newly created column families. " + + "The compression algorithm must be installed and available on the HBase cluster. Titan cannot install " + + "and configure new compression algorithms on the HBase cluster by itself.", + ConfigOption.Type.MASKABLE, "GZ"); + + public static final ConfigOption SKIP_SCHEMA_CHECK = + new ConfigOption(HBASE_NS, "skip-schema-check", + "Assume that Titan's HBase table and column families already exist. " + + "When this is true, Titan will not check for the existence of its table/CFs, " + + "nor will it attempt to create them under any circumstances. This is useful " + + "when running Titan without HBase admin privileges.", + ConfigOption.Type.MASKABLE, false); + + public static final ConfigOption HBASE_TABLE = + new ConfigOption(HBASE_NS, "table", + "The name of the table Titan will use. When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) + + " is false, Titan will automatically create this table if it does not already exist.", + ConfigOption.Type.LOCAL, "titan"); + + /** + * Related bug fixed in 0.98.0, 0.94.7, 0.95.0: + * + * https://issues.apache.org/jira/browse/HBASE-8170 + */ + public static final int MIN_REGION_COUNT = 3; + + /** + * The total number of HBase regions to create with Titan's table. This + * setting only effects table creation; this normally happens just once when + * Titan connects to an HBase backend for the first time. + */ + public static final ConfigOption REGION_COUNT = + new ConfigOption(HBASE_NS, "region-count", + "The number of initial regions set when creating Titan's HBase table", + ConfigOption.Type.MASKABLE, Integer.class, new Predicate() { + @Override + public boolean apply(Integer input) { + return null != input && MIN_REGION_COUNT <= input; + } + } + ); + + /** + * This setting is used only when {@link #REGION_COUNT} is unset. + *

+ * If Titan's HBase table does not exist, then it will be created with total + * region count = (number of servers reported by ClusterStatus) * (this + * value). + *

+ * The Apache HBase manual suggests an order-of-magnitude range of potential + * values for this setting: + * + *

    + *
  • + * 2.5.2.7. Managed Splitting: + *
    + * What's the optimal number of pre-split regions to create? Mileage will + * vary depending upon your application. You could start low with 10 + * pre-split regions / server and watch as data grows over time. It's + * better to err on the side of too little regions and rolling split later. + *
    + *
  • + *
  • + * 9.7 Regions: + *
    + * In general, HBase is designed to run with a small (20-200) number of + * relatively large (5-20Gb) regions per server... Typically you want to + * keep your region count low on HBase for numerous reasons. Usually + * right around 100 regions per RegionServer has yielded the best results. + *
    + *
  • + *
+ * + * These considerations may differ for other HBase implementations (e.g. MapR). + */ + public static final ConfigOption REGIONS_PER_SERVER = + new ConfigOption(HBASE_NS, "regions-per-server", + "The number of regions per regionserver to set when creating Titan's HBase table", + ConfigOption.Type.MASKABLE, Integer.class); + + /** + * If this key is present in either the JVM system properties or the process + * environment (checked in the listed order, first hit wins), then its value + * must be the full package and class name of an implementation of + * {@link HBaseCompat} that has a no-arg public constructor. + *

+ * When this is not set, Titan attempts to automatically detect the + * HBase runtime version by calling {@link VersionInfo#getVersion()}. Titan + * then checks the returned version string against a hard-coded list of + * supported version prefixes and instantiates the associated compat layer + * if a match is found. + *

+ * When this is set, Titan will not call + * {@code VersionInfo.getVersion()} or read its hard-coded list of supported + * version prefixes. Titan will instead attempt to instantiate the class + * specified (via the no-arg constructor which must exist) and then attempt + * to cast it to HBaseCompat and use it as such. Titan will assume the + * supplied implementation is compatible with the runtime HBase version and + * make no attempt to verify that assumption. + *

+ * Setting this key incorrectly could cause runtime exceptions at best or + * silent data corruption at worst. This setting is intended for users + * running exotic HBase implementations that don't support VersionInfo or + * implementations which return values from {@code VersionInfo.getVersion()} + * that are inconsistent with Apache's versioning convention. It may also be + * useful to users who want to run against a new release of HBase that Titan + * doesn't yet officially support. + * + */ + public static final ConfigOption COMPAT_CLASS = + new ConfigOption(HBASE_NS, "compat-class", + "The package and class name of the HBaseCompat implementation. HBaseCompat masks version-specific HBase API differences. " + + "When this option is unset, Titan calls HBase's VersionInfo.getVersion() and loads the matching compat class " + + "at runtime. Setting this option forces Titan to instead reflectively load and instantiate the specified class.", + ConfigOption.Type.MASKABLE, String.class); + + public static final int PORT_DEFAULT = 9160; + + public static final Timestamps PREFERRED_TIMESTAMPS = Timestamps.MILLI; + + public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE = + new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true); + + private static final BiMap SHORT_CF_NAME_MAP = + ImmutableBiMap.builder() + .put(Backend.INDEXSTORE_NAME, "g") + .put(Backend.INDEXSTORE_NAME + Backend.LOCK_STORE_SUFFIX, "h") + .put(Backend.ID_STORE_NAME, "i") + .put(Backend.EDGESTORE_NAME, "e") + .put(Backend.EDGESTORE_NAME + Backend.LOCK_STORE_SUFFIX, "f") + .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME, "s") + .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME + Backend.LOCK_STORE_SUFFIX, "t") + .put(Backend.SYSTEM_MGMT_LOG_NAME, "m") + .put(Backend.SYSTEM_TX_LOG_NAME, "l") + .build(); + + private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4); + + static { + // Verify that shortCfNameMap is injective + // Should be guaranteed by Guava BiMap, but it doesn't hurt to check + Preconditions.checkArgument(null != SHORT_CF_NAME_MAP); + Collection shorts = SHORT_CF_NAME_MAP.values(); + Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size()); + } + + // Immutable instance fields + private final String tableName; + private final String compression; + private final int regionCount; + private final int regionsPerServer; + private final ConnectionMask cnx; + private final org.apache.hadoop.conf.Configuration hconf; + private final boolean shortCfNames; + private final boolean skipSchemaCheck; + private final String compatClass; + private final HBaseCompat compat; + + private static final ConcurrentHashMap openManagers = + new ConcurrentHashMap(); + + // Mutable instance state + private final ConcurrentMap openStores; + + private LocalLockMediator llm; + + public HBaseStoreManager(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) throws BackendException { + super(config, PORT_DEFAULT); + + checkConfigDeprecation(config); + + this.tableName = config.get(HBASE_TABLE); + this.compression = config.get(COMPRESSION); + this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1; + this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1; + this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK); + this.compatClass = config.has(COMPAT_CLASS) ? config.get(COMPAT_CLASS) : null; + this.compat = HBaseCompatLoader.getCompat(compatClass); + + /* + * Specifying both region count options is permitted but may be + * indicative of a misunderstanding, so issue a warning. + */ + if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) { + logger.warn("Both {} and {} are set in Titan's configuration, but " + + "the former takes precedence and the latter will be ignored.", + REGION_COUNT, REGIONS_PER_SERVER); + } + + /* This static factory calls HBaseConfiguration.addHbaseResources(), + * which in turn applies the contents of hbase-default.xml and then + * applies the contents of hbase-site.xml. + */ + this.hconf = HBaseConfiguration.create(); + + // Copy a subset of our commons config into a Hadoop config + int keysLoaded=0; + Map configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE); + for (Map.Entry entry : configSub.entrySet()) { + logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue()); + if (entry.getValue()==null) continue; + hconf.set(entry.getKey(), entry.getValue().toString()); + keysLoaded++; + } + + // Special case for STORAGE_HOSTS + if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) { + String zkQuorumKey = "hbase.zookeeper.quorum"; + String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS)); + hconf.set(zkQuorumKey, csHostList); + logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList); + } + + logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded); + + this.shortCfNames = config.get(SHORT_CF_NAMES); + + try { + //this.cnx = HConnectionManager.createConnection(hconf); + this.cnx = compat.createConnection(hconf); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + + if (logger.isTraceEnabled()) { + openManagers.put(this, new Throwable("Manager Opened")); + dumpOpenManagers(); + } + + logger.debug("Dumping HBase config key=value pairs"); + for (Map.Entry entry : hconf) { + logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue()); + } + logger.debug("End of HBase config key=value pairs"); + + openStores = new ConcurrentHashMap(); + } + + @Override + public Deployment getDeployment() { + List local; + try { + local = getLocalKeyPartition(); + return null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE; + } catch (BackendException e) { + // propagating StorageException might be a better approach + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return "hbase[" + tableName + "@" + super.toString() + "]"; + } + + public void dumpOpenManagers() { + int estimatedSize = openManagers.size(); + logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize); + for (HBaseStoreManager m : openManagers.keySet()) { + logger.trace("Manager {} opened at:", m, openManagers.get(m)); + } + logger.trace("---- End open HBase store manager list ({} managers) ----", estimatedSize); + } + + @Override + public void close() { + openStores.clear(); + if (logger.isTraceEnabled()) + openManagers.remove(this); + IOUtils.closeQuietly(cnx); + } + + @Override + public StoreFeatures getFeatures() { + + Configuration c = GraphDatabaseConfiguration.buildConfiguration(); + + StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder() + .orderedScan(true).unorderedScan(true).batchMutation(true) + .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true) + .timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS) + .locking(true) + .keyConsistent(c); + + try { + fb.localKeyPartition(getDeployment() == Deployment.LOCAL); + } catch (Exception e) { + logger.warn("Unexpected exception during getDeployment()", e); + } + + return fb.build(); + } + + @Override + public void mutateMany(Map> mutations, StoreTransaction txh) throws BackendException { + logger.debug("Enter mutateMany"); + final MaskedTimestamp commitTime = new MaskedTimestamp(txh); + // In case of an addition and deletion with identical timestamps, the + // deletion tombstone wins. + // http://hbase.apache.org/book/versions.html#d244e4250 + Map> commandsPerKey = + convertToCommands( + mutations, + commitTime.getAdditionTime(times.getUnit()), + commitTime.getDeletionTime(times.getUnit())); + + List batch = new ArrayList(commandsPerKey.size()); // actual batch operation + + // convert sorted commands into representation required for 'batch' operation + for (Pair commands : commandsPerKey.values()) { + if (commands.getFirst() != null) + batch.add(commands.getFirst()); + + if (commands.getSecond() != null) + batch.add(commands.getSecond()); + } + + try { + TableMask table = null; + + try { + table = cnx.getTable(tableName); + logger.debug("mutateMany : batch mutate started size {} ", batch.size()); + table.batch(batch, new Object[batch.size()]); + logger.debug("mutateMany : batch mutate finished {} ", batch.size()); + } finally { + IOUtils.closeQuietly(table); + } + } catch (IOException e) { + throw new TemporaryBackendException(e); + } catch (InterruptedException e) { + throw new TemporaryBackendException(e); + } + + sleepAfterWrite(txh, commitTime); + } + + @Override + public KeyColumnValueStore openDatabase(String longName) throws BackendException { + + return openDatabase(longName, -1); + } + + @Override + public KeyColumnValueStore openDatabase(final String longName, int ttlInSeconds) throws BackendException { + + HBaseKeyColumnValueStore store = openStores.get(longName); + + if (store == null) { + final String cfName = shortCfNames ? shortenCfName(longName) : longName; + + final String llmPrefix = getName(); + llm = LocalLockMediators.INSTANCE.get(llmPrefix, times); + HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName, llm); + + store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread + + if (store == null) { + if (!skipSchemaCheck) + ensureColumnFamilyExists(tableName, cfName, ttlInSeconds); + + store = newStore; + } + logger.info("Loaded 1.x Hbase Client Store Manager"); + } + + return store; + } + + + @Override + public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException { + return new HBaseTransaction(config, llm); + } + + @Override + public String getName() { + return tableName; + } + + /** + * Deletes the specified table with all its columns. + * ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss. + */ + @Override + public void clearStorage() throws BackendException { + try (AdminMask adm = getAdminInterface()) { + adm.clearTable(tableName, times.getTime().getNativeTimestamp()); + } catch (IOException e) + { + throw new TemporaryBackendException(e); + } + } + + @Override + public List getLocalKeyPartition() throws BackendException { + + List result = new LinkedList(); + + HTable table = null; + try { + ensureTableExists(tableName, getCfNameForStoreName(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME), 0); + + table = new HTable(hconf, tableName); + + Map normed = + normalizeKeyBounds(table.getRegionLocations()); + + for (Map.Entry e : normed.entrySet()) { + if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) { + result.add(e.getKey()); + logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue()); + } else { + logger.debug("Discarding remote {}", e.getValue()); + } + } + } catch (MasterNotRunningException e) { + logger.warn("Unexpected MasterNotRunningException", e); + } catch (ZooKeeperConnectionException e) { + logger.warn("Unexpected ZooKeeperConnectionException", e); + } catch (IOException e) { + logger.warn("Unexpected IOException", e); + } finally { + IOUtils.closeQuietly(table); + } + return result; + } + + /** + * Given a map produced by {@link HTable#getRegionLocations()}, transform + * each key from an {@link HRegionInfo} to a {@link KeyRange} expressing the + * region's start and end key bounds using Titan-partitioning-friendly + * conventions (start inclusive, end exclusive, zero bytes appended where + * necessary to make all keys at least 4 bytes long). + *

+ * This method iterates over the entries in its map parameter and performs + * the following conditional conversions on its keys. "Require" below means + * either a {@link Preconditions} invocation or an assertion. HRegionInfo + * sometimes returns start and end keys of zero length; this method replaces + * zero length keys with null before doing any of the checks described + * below. The parameter map and the values it contains are only read and + * never modified. + * + *

    + *
  • If an entry's HRegionInfo has null start and end keys, then first + * require that the parameter map is a singleton, and then return a + * single-entry map whose {@code KeyRange} has start and end buffers that + * are both four bytes of zeros.
  • + *
  • If the entry has a null end key (but non-null start key), put an + * equivalent entry in the result map with a start key identical to the + * input, except that zeros are appended to values less than 4 bytes long, + * and an end key that is four bytes of zeros. + *
  • If the entry has a null start key (but non-null end key), put an + * equivalent entry in the result map where the start key is four bytes of + * zeros, and the end key has zeros appended, if necessary, to make it at + * least 4 bytes long, after which one is added to the padded value in + * unsigned 32-bit arithmetic with overflow allowed.
  • + *
  • Any entry which matches none of the above criteria results in an + * equivalent entry in the returned map, except that zeros are appended to + * both keys to make each at least 4 bytes long, and the end key is then + * incremented as described in the last bullet point.
  • + *
+ * + * After iterating over the parameter map, this method checks that it either + * saw no entries with null keys, one entry with a null start key and a + * different entry with a null end key, or one entry with both start and end + * keys null. If any null keys are observed besides these three cases, the + * method will die with a precondition failure. + * + * @param raw + * A map of HRegionInfo and ServerName from HBase + * @return Titan-friendly expression of each region's rowkey boundaries + */ + private Map normalizeKeyBounds(NavigableMap raw) { + + Map.Entry nullStart = null; + Map.Entry nullEnd = null; + + ImmutableMap.Builder b = ImmutableMap.builder(); + + for (Map.Entry e : raw.entrySet()) { + HRegionInfo regionInfo = e.getKey(); + byte startKey[] = regionInfo.getStartKey(); + byte endKey[] = regionInfo.getEndKey(); + + if (0 == startKey.length) { + startKey = null; + logger.trace("Converted zero-length HBase startKey byte array to null"); + } + + if (0 == endKey.length) { + endKey = null; + logger.trace("Converted zero-length HBase endKey byte array to null"); + } + + if (null == startKey && null == endKey) { + Preconditions.checkState(1 == raw.size()); + logger.debug("HBase table {} has a single region {}", tableName, regionInfo); + // Choose arbitrary shared value = startKey = endKey + return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), e.getValue()).build(); + } else if (null == startKey) { + logger.debug("Found HRegionInfo with null startKey on server {}: {}", e.getValue(), regionInfo); + Preconditions.checkState(null == nullStart); + nullStart = e; + // I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive + StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey)); + // Replace null start key with zeroes + b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), e.getValue()); + } else if (null == endKey) { + logger.debug("Found HRegionInfo with null endKey on server {}: {}", e.getValue(), regionInfo); + Preconditions.checkState(null == nullEnd); + nullEnd = e; + // Replace null end key with zeroes + b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), e.getValue()); + } else { + Preconditions.checkState(null != startKey); + Preconditions.checkState(null != endKey); + + // Convert HBase's inclusive end keys into exclusive Titan end keys + StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey)); + StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey)); + + KeyRange kr = new KeyRange(startBuf, endBuf); + b.put(kr, e.getValue()); + logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", e.getValue(), regionInfo); + } + } + + // Require either no null key bounds or a pair of them + Preconditions.checkState(!(null == nullStart ^ null == nullEnd)); + + // Check that every key in the result is at least 4 bytes long + Map result = b.build(); + for (KeyRange kr : result.keySet()) { + Preconditions.checkState(4 <= kr.getStart().length()); + Preconditions.checkState(4 <= kr.getEnd().length()); + } + + return result; + } + + /** + * If the parameter is shorter than 4 bytes, then create and return a new 4 + * byte array with the input array's bytes followed by zero bytes. Otherwise + * return the parameter. + * + * @param dataToPad non-null but possibly zero-length byte array + * @return either the parameter or a new array + */ + private final byte[] zeroExtend(byte[] dataToPad) { + assert null != dataToPad; + + final int targetLength = 4; + + if (targetLength <= dataToPad.length) + return dataToPad; + + byte padded[] = new byte[targetLength]; + + for (int i = 0; i < dataToPad.length; i++) + padded[i] = dataToPad[i]; + + for (int i = dataToPad.length; i < padded.length; i++) + padded[i] = (byte)0; + + return padded; + } + + public static String shortenCfName(String longName) throws PermanentBackendException { + final String s; + if (SHORT_CF_NAME_MAP.containsKey(longName)) { + s = SHORT_CF_NAME_MAP.get(longName); + Preconditions.checkNotNull(s); + logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s); + } else { + if (SHORT_CF_NAME_MAP.containsValue(longName)) { + String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true"; + String msg = String.format(fmt, SHORT_CF_NAME_MAP.inverse().get(longName), longName, SHORT_CF_NAMES.getName()); + throw new PermanentBackendException(msg); + } + s = longName; + logger.debug("Kept default CF name \"{}\" because it has no associated short form", s); + } + return s; + } + + private HTableDescriptor ensureTableExists(String tableName, String initialCFName, int ttlInSeconds) throws BackendException { + AdminMask adm = null; + + HTableDescriptor desc; + + try { // Create our table, if necessary + adm = getAdminInterface(); + /* + * Some HBase versions/impls respond badly to attempts to create a + * table without at least one CF. See #661. Creating a CF along with + * the table avoids HBase carping. + */ + if (adm.tableExists(tableName)) { + desc = adm.getTableDescriptor(tableName); + } else { + desc = createTable(tableName, initialCFName, ttlInSeconds, adm); + } + } catch (IOException e) { + throw new TemporaryBackendException(e); + } finally { + IOUtils.closeQuietly(adm); + } + + return desc; + } + + private HTableDescriptor createTable(String tableName, String cfName, int ttlInSeconds, AdminMask adm) throws IOException { + HTableDescriptor desc = compat.newTableDescriptor(tableName); + + HColumnDescriptor cdesc = new HColumnDescriptor(cfName); + setCFOptions(cdesc, ttlInSeconds); + + compat.addColumnFamilyToTableDescriptor(desc, cdesc); + + int count; // total regions to create + String src; + + if (MIN_REGION_COUNT <= (count = regionCount)) { + src = "region count configuration"; + } else if (0 < regionsPerServer && + MIN_REGION_COUNT <= (count = regionsPerServer * adm.getEstimatedRegionServerCount())) { + src = "ClusterStatus server count"; + } else { + count = -1; + src = "default"; + } + + if (MIN_REGION_COUNT < count) { + adm.createTable(desc, getStartKey(count), getEndKey(count), count); + logger.debug("Created table {} with region count {} from {}", tableName, count, src); + } else { + adm.createTable(desc); + logger.debug("Created table {} with default start key, end key, and region count", tableName); + } + + return desc; + } + + /** + * This method generates the second argument to + * {@link HBaseAdmin#createTable(HTableDescriptor, byte[], byte[], int)}. + *

+ * From the {@code createTable} javadoc: + * "The start key specified will become the end key of the first region of + * the table, and the end key specified will become the start key of the + * last region of the table (the first region has a null start key and + * the last region has a null end key)" + *

+ * To summarize, the {@code createTable} argument called "startKey" is + * actually the end key of the first region. + */ + private byte[] getStartKey(int regionCount) { + ByteBuffer regionWidth = ByteBuffer.allocate(4); + regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip(); + return StaticArrayBuffer.of(regionWidth).getBytes(0, 4); + } + + /** + * Companion to {@link #getStartKey(int)}. See its javadoc for details. + */ + private byte[] getEndKey(int regionCount) { + ByteBuffer regionWidth = ByteBuffer.allocate(4); + regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip(); + return StaticArrayBuffer.of(regionWidth).getBytes(0, 4); + } + + private void ensureColumnFamilyExists(String tableName, String columnFamily, int ttlInSeconds) throws BackendException { + AdminMask adm = null; + try { + adm = getAdminInterface(); + HTableDescriptor desc = ensureTableExists(tableName, columnFamily, ttlInSeconds); + + Preconditions.checkNotNull(desc); + + HColumnDescriptor cf = desc.getFamily(columnFamily.getBytes()); + + // Create our column family, if necessary + if (cf == null) { + try { + if (!adm.isTableDisabled(tableName)) { + adm.disableTable(tableName); + } + } catch (TableNotEnabledException e) { + logger.debug("Table {} already disabled", tableName); + } catch (IOException e) { + throw new TemporaryBackendException(e); + } + + try { + HColumnDescriptor cdesc = new HColumnDescriptor(columnFamily); + + setCFOptions(cdesc, ttlInSeconds); + + adm.addColumn(tableName, cdesc); + + logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily); + + adm.enableTable(tableName); + } catch (TableNotFoundException ee) { + logger.error("TableNotFoundException", ee); + throw new PermanentBackendException(ee); + } catch (org.apache.hadoop.hbase.TableExistsException ee) { + logger.debug("Swallowing exception {}", ee); + } catch (IOException ee) { + throw new TemporaryBackendException(ee); + } + } + } finally { + IOUtils.closeQuietly(adm); + } + } + + private void setCFOptions(HColumnDescriptor cdesc, int ttlInSeconds) { + if (null != compression && !compression.equals(COMPRESSION_DEFAULT)) + compat.setCompression(cdesc, compression); + + if (ttlInSeconds > 0) + cdesc.setTimeToLive(ttlInSeconds); + + cdesc.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); + } + + /** + * Convert Titan internal Mutation representation into HBase native commands. + * + * @param mutations Mutations to convert into HBase commands. + * @param putTimestamp The timestamp to use for Put commands. + * @param delTimestamp The timestamp to use for Delete commands. + * @return Commands sorted by key converted from Titan internal representation. + * @throws com.thinkaurelius.titan.diskstorage.PermanentBackendException + */ + private Map> convertToCommands(Map> mutations, + final long putTimestamp, + final long delTimestamp) throws PermanentBackendException { + Map> commandsPerKey = new HashMap>(); + + for (Map.Entry> entry : mutations.entrySet()) { + + String cfString = getCfNameForStoreName(entry.getKey()); + byte[] cfName = cfString.getBytes(); + + for (Map.Entry m : entry.getValue().entrySet()) { + byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY); + KCVMutation mutation = m.getValue(); + + Pair commands = commandsPerKey.get(m.getKey()); + + if (commands == null) { + commands = new Pair(); + commandsPerKey.put(m.getKey(), commands); + } + + if (mutation.hasDeletions()) { + if (commands.getSecond() == null) { + Delete d = new Delete(key); + compat.setTimestamp(d, delTimestamp); + commands.setSecond(d); + } + + for (StaticBuffer b : mutation.getDeletions()) { + commands.getSecond().deleteColumns(cfName, b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp); + } + } + + if (mutation.hasAdditions()) { + if (commands.getFirst() == null) { + Put p = new Put(key, putTimestamp); + commands.setFirst(p); + } + + for (Entry e : mutation.getAdditions()) { + commands.getFirst().add(cfName, + e.getColumnAs(StaticBuffer.ARRAY_FACTORY), + putTimestamp, + e.getValueAs(StaticBuffer.ARRAY_FACTORY)); + } + } + } + } + + return commandsPerKey; + } + + private String getCfNameForStoreName(String storeName) throws PermanentBackendException { + return shortCfNames ? shortenCfName(storeName) : storeName; + } + + private void checkConfigDeprecation(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) { + if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) { + logger.warn("The configuration property {} is ignored for HBase. Set hbase.zookeeper.property.clientPort in hbase-site.xml or {}.hbase.zookeeper.property.clientPort in Titan's configuration file.", + ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), ConfigElement.getPath(HBASE_CONFIGURATION_NAMESPACE)); + } + } + + private AdminMask getAdminInterface() { + try { + return cnx.getAdmin(); + } catch (IOException e) { + throw new TitanException(e); + } + } + + /** + * Similar to {@link Function}, except that the {@code apply} method is allowed + * to throw {@link BackendException}. + */ + private static interface BackendFunction { + + T apply(F input) throws BackendException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java new file mode 100644 index 0000000..e13593f --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java @@ -0,0 +1,75 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; +import com.thinkaurelius.titan.diskstorage.util.KeyColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * This class overrides and adds nothing compared with + * {@link com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific + * to HBase, which lets us check for user errors like passing a Cassandra + * transaction into a HBase method. + * + * @author Dan LaRocque + */ +public class HBaseTransaction extends AbstractStoreTransaction { + + private static final Logger log = LoggerFactory.getLogger(HBaseTransaction.class); + + LocalLockMediator llm; + + Set keyColumnLocks = new LinkedHashSet<>(); + + public HBaseTransaction(final BaseTransactionConfig config, LocalLockMediator llm) { + super(config); + this.llm = llm; + } + + @Override + public synchronized void rollback() throws BackendException { + super.rollback(); + log.debug("Rolled back transaction"); + deleteAllLocks(); + } + + @Override + public synchronized void commit() throws BackendException { + super.commit(); + log.debug("Committed transaction"); + deleteAllLocks(); + } + + public void updateLocks(KeyColumn lockID, StaticBuffer expectedValue) { + keyColumnLocks.add(lockID); + } + + private void deleteAllLocks() { + for(KeyColumn kc : keyColumnLocks) { + log.debug("Removed lock {} ", kc); + llm.unlock(kc, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java new file mode 100644 index 0000000..8660644 --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java @@ -0,0 +1,49 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; + +public class HConnection0_98 implements ConnectionMask +{ + + private final HConnection cnx; + + public HConnection0_98(HConnection cnx) + { + this.cnx = cnx; + } + + @Override + public TableMask getTable(String name) throws IOException + { + return new HTable0_98(cnx.getTable(name)); + } + + @Override + public AdminMask getAdmin() throws IOException + { + return new HBaseAdmin0_98(new HBaseAdmin(cnx)); + } + + @Override + public void close() throws IOException + { + cnx.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java new file mode 100644 index 0000000..91e5026 --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java @@ -0,0 +1,50 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; + +public class HConnection1_0 implements ConnectionMask +{ + + private final Connection cnx; + + public HConnection1_0(Connection cnx) + { + this.cnx = cnx; + } + + @Override + public TableMask getTable(String name) throws IOException + { + return new HTable1_0(cnx.getTable(TableName.valueOf(name))); + } + + @Override + public AdminMask getAdmin() throws IOException + { + return new HBaseAdmin1_0(new HBaseAdmin(cnx)); + } + + @Override + public void close() throws IOException + { + cnx.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java new file mode 100644 index 0000000..4ddb2f0 --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java @@ -0,0 +1,60 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Scan; + +public class HTable0_98 implements TableMask +{ + private final HTableInterface table; + + public HTable0_98(HTableInterface table) + { + this.table = table; + } + + @Override + public ResultScanner getScanner(Scan filter) throws IOException + { + return table.getScanner(filter); + } + + @Override + public Result[] get(List gets) throws IOException + { + return table.get(gets); + } + + @Override + public void batch(List writes, Object[] results) throws IOException, InterruptedException + { + table.batch(writes, results); + table.flushCommits(); + } + + @Override + public void close() throws IOException + { + table.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java new file mode 100644 index 0000000..5085abb --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java @@ -0,0 +1,60 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; + +public class HTable1_0 implements TableMask +{ + private final Table table; + + public HTable1_0(Table table) + { + this.table = table; + } + + @Override + public ResultScanner getScanner(Scan filter) throws IOException + { + return table.getScanner(filter); + } + + @Override + public Result[] get(List gets) throws IOException + { + return table.get(gets); + } + + @Override + public void batch(List writes, Object[] results) throws IOException, InterruptedException + { + table.batch(writes, results); + /* table.flushCommits(); not needed anymore */ + } + + @Override + public void close() throws IOException + { + table.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java new file mode 100644 index 0000000..dd3d61e --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java @@ -0,0 +1,40 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Scan; + +/** + * This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course + * of development from 0.94 to 1.0 and beyond. + */ +public interface TableMask extends Closeable +{ + + ResultScanner getScanner(Scan filter) throws IOException; + + Result[] get(List gets) throws IOException; + + void batch(List writes, Object[] results) throws IOException, InterruptedException; + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java new file mode 100644 index 0000000..20c59e1 --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java @@ -0,0 +1,345 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.locking; + +import com.google.common.base.Preconditions; +import com.thinkaurelius.titan.diskstorage.util.time.Timepoint; +import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider; +import com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction; +import com.thinkaurelius.titan.diskstorage.util.KeyColumn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * This class resolves lock contention between two transactions on the same JVM. + *

+ * This is not just an optimization to reduce network traffic. Locks written by + * Titan to a distributed key-value store contain an identifier, the "Rid", + * which is unique only to the process level. The Rid can't tell which + * transaction in a process holds any given lock. This class prevents two + * transactions in a single process from concurrently writing the same lock to a + * distributed key-value store. + * + * @author Dan LaRocque + */ + +public class LocalLockMediator { + + private static final Logger log = LoggerFactory + .getLogger(LocalLockMediator.class); + + /** + * Namespace for which this mediator is responsible + * + * @see LocalLockMediatorProvider + */ + private final String name; + + private final TimestampProvider times; + + private DelayQueue expiryQueue = new DelayQueue<>(); + + private ExecutorService lockCleanerService = Executors.newFixedThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + thread.setDaemon(true); + return thread; + } + }); + + + + /** + * Maps a ({@code key}, {@code column}) pair to the local transaction + * holding a lock on that pair. Values in this map may have already expired + * according to {@link AuditRecord#expires}, in which case the lock should + * be considered invalid. + */ + private final ConcurrentHashMap> locks = new ConcurrentHashMap>(); + + public LocalLockMediator(String name, TimestampProvider times) { + this.name = name; + this.times = times; + + Preconditions.checkNotNull(name); + Preconditions.checkNotNull(times); + lockCleanerService.submit(new LockCleaner()); + } + + /** + * Acquire the lock specified by {@code kc}. + *

+ *

+ * For any particular key-column, whatever value of {@code requestor} is + * passed to this method must also be passed to the associated later call to + * {@link #unlock(KeyColumn, ExpectedValueCheckingTransaction)}. + *

+ * If some requestor {@code r} calls this method on a KeyColumn {@code k} + * and this method returns true, then subsequent calls to this method by + * {@code r} on {@code l} merely attempt to update the {@code expiresAt} + * timestamp. This differs from typical lock reentrance: multiple successful + * calls to this method do not require an equal number of calls to + * {@code #unlock()}. One {@code #unlock()} call is enough, no matter how + * many times a {@code requestor} called {@code lock} beforehand. Note that + * updating the timestamp may fail, in which case the lock is considered to + * have expired and the calling context should assume it no longer holds the + * lock specified by {@code kc}. + *

+ * The number of nanoseconds elapsed since the UNIX Epoch is not readily + * available within the JVM. When reckoning expiration times, this method + * uses the approximation implemented by + * {@link com.thinkaurelius.titan.diskstorage.util.NanoTime#getApproxNSSinceEpoch(false)}. + *

+ * The current implementation of this method returns true when given an + * {@code expiresAt} argument in the past. Future implementations may return + * false instead. + * + * @param kc lock identifier + * @param requestor the object locking {@code kc} + * @param expires instant at which this lock will automatically expire + * @return true if the lock is acquired, false if it was not acquired + */ + public boolean lock(KeyColumn kc, T requestor, Timepoint expires) { + assert null != kc; + assert null != requestor; + + AuditRecord audit = new AuditRecord(requestor, expires); + AuditRecord inmap = locks.putIfAbsent(kc, audit); + + boolean success = false; + + if (null == inmap) { + // Uncontended lock succeeded + if (log.isTraceEnabled()) { + log.trace("New local lock created: {} namespace={} txn={}", + new Object[]{kc, name, requestor}); + } + success = true; + } else if (inmap.equals(audit)) { + // requestor has already locked kc; update expiresAt + success = locks.replace(kc, inmap, audit); + if (log.isTraceEnabled()) { + if (success) { + log.trace( + "Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}", + new Object[]{kc, name, requestor, inmap.expires, + audit.expires}); + } else { + log.trace( + "Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}", + new Object[]{kc, name, requestor, inmap.expires, + audit.expires}); + } + } + } else if (0 > inmap.expires.compareTo(times.getTime())) { + // the recorded lock has expired; replace it + success = locks.replace(kc, inmap, audit); + if (log.isTraceEnabled()) { + log.trace( + "Discarding expired lock: {} namespace={} txn={} expired={}", + new Object[]{kc, name, inmap.holder, inmap.expires}); + } + } else { + // we lost to a valid lock + if (log.isTraceEnabled()) { + log.trace( + "Local lock failed: {} namespace={} txn={} (already owned by {})", + new Object[]{kc, name, requestor, inmap}); + } + } + + if (success) { + expiryQueue.add(new ExpirableKeyColumn(kc, expires)); + } + return success; + } + + /** + * Release the lock specified by {@code kc} and which was previously + * locked by {@code requestor}, if it is possible to release it. + * + * @param kc lock identifier + * @param requestor the object which previously locked {@code kc} + */ + public boolean unlock(KeyColumn kc, T requestor) { + + if (!locks.containsKey(kc)) { + log.info("Local unlock failed: no locks found for {}", kc); + return false; + } + + AuditRecord unlocker = new AuditRecord(requestor, null); + + AuditRecord holder = locks.get(kc); + + if (!holder.equals(unlocker)) { + log.error("Local unlock of {} by {} failed: it is held by {}", + new Object[]{kc, unlocker, holder}); + return false; + } + + boolean removed = locks.remove(kc, unlocker); + + if (removed) { + expiryQueue.remove(kc); + if (log.isTraceEnabled()) { + log.trace("Local unlock succeeded: {} namespace={} txn={}", + new Object[]{kc, name, requestor}); + } + } else { + log.warn("Local unlock warning: lock record for {} disappeared " + + "during removal; this suggests the lock either expired " + + "while we were removing it, or that it was erroneously " + + "unlocked multiple times.", kc); + } + + // Even if !removed, we're finished unlocking, so return true + return true; + } + + public String toString() { + return "LocalLockMediator [" + name + ", ~" + locks.size() + + " current locks]"; + } + + /** + * A record containing the local transaction that holds a lock and the + * lock's expiration time. + */ + private static class AuditRecord { + + /** + * The local transaction that holds/held the lock. + */ + private final T holder; + /** + * The expiration time of a the lock. + */ + private final Timepoint expires; + /** + * Cached hashCode. + */ + private int hashCode; + + private AuditRecord(T holder, Timepoint expires) { + this.holder = holder; + this.expires = expires; + } + + /** + * This implementation depends only on the lock holder and not on the + * lock expiration time. + */ + @Override + public int hashCode() { + if (0 == hashCode) + hashCode = holder.hashCode(); + + return hashCode; + } + + /** + * This implementation depends only on the lock holder and not on the + * lock expiration time. + */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + /* + * This warning suppression is harmless because we are only going to + * call other.holder.equals(...), and since equals(...) is part of + * Object, it is guaranteed to be defined no matter the concrete + * type of parameter T. + */ + @SuppressWarnings("rawtypes") + AuditRecord other = (AuditRecord) obj; + if (holder == null) { + if (other.holder != null) + return false; + } else if (!holder.equals(other.holder)) + return false; + return true; + } + + @Override + public String toString() { + return "AuditRecord [txn=" + holder + ", expires=" + expires + "]"; + } + + } + + private class LockCleaner implements Runnable { + + @Override + public void run() { + try { + while (true) { + log.debug("Lock Cleaner service started"); + ExpirableKeyColumn lock = expiryQueue.take(); + log.debug("Expiring key column " + lock.getKeyColumn()); + locks.remove(lock.getKeyColumn()); + } + } catch (InterruptedException e) { + log.debug("Received interrupt. Exiting"); + } + } + } + + private static class ExpirableKeyColumn implements Delayed { + + private Timepoint expiryTime; + private KeyColumn kc; + + public ExpirableKeyColumn(KeyColumn keyColumn, Timepoint expiryTime) { + this.kc = keyColumn; + this.expiryTime = expiryTime; + } + + @Override + public long getDelay(TimeUnit unit) { + return expiryTime.getTimestamp(TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) < ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) { + return -1; + } + if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) > ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) { + return 1; + } + return 0; + } + + public KeyColumn getKeyColumn() { + return kc; + } + } +}