atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suma...@apache.org
Subject [2/2] incubator-atlas git commit: ATLAS-114 Upgrade Hbase client to 1.1.2(sumasai)
Date Wed, 11 Nov 2015 16:32:10 GMT
ATLAS-114 Upgrade Hbase client to 1.1.2(sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ebc4502a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ebc4502a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ebc4502a

Branch: refs/heads/master
Commit: ebc4502a815189848daadb9be3288d6e76bcd3e7
Parents: 5bc6f6b
Author: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Authored: Wed Nov 11 21:57:33 2015 +0530
Committer: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Committed: Wed Nov 11 22:00:49 2015 +0530

----------------------------------------------------------------------
 LICENSE.txt                                     |  17 +
 distro/src/bin/atlas_config.py                  |  20 +-
 distro/src/bin/atlas_start.py                   |  13 +-
 distro/src/conf/application.properties          |   4 +-
 docs/src/site/twiki/Configuration.twiki         |   9 +
 docs/src/site/twiki/InstallationSteps.twiki     |  17 +
 pom.xml                                         |   3 +-
 release-log.txt                                 |   1 +
 repository/pom.xml                              |   9 +-
 .../titan/diskstorage/hbase/AdminMask.java      |  62 ++
 .../titan/diskstorage/hbase/ConnectionMask.java |  30 +
 .../titan/diskstorage/hbase/HBaseAdmin0_98.java | 152 +++
 .../titan/diskstorage/hbase/HBaseAdmin1_0.java  | 135 +++
 .../titan/diskstorage/hbase/HBaseCompat.java    |  61 ++
 .../diskstorage/hbase/HBaseCompat0_98.java      |  58 ++
 .../titan/diskstorage/hbase/HBaseCompat1_0.java |  59 ++
 .../titan/diskstorage/hbase/HBaseCompat1_1.java |  58 ++
 .../diskstorage/hbase/HBaseCompatLoader.java    |  80 ++
 .../hbase/HBaseKeyColumnValueStore.java         | 368 ++++++++
 .../diskstorage/hbase/HBaseStoreManager.java    | 925 +++++++++++++++++++
 .../diskstorage/hbase/HBaseTransaction.java     |  33 +
 .../diskstorage/hbase/HConnection0_98.java      |  49 +
 .../titan/diskstorage/hbase/HConnection1_0.java |  50 +
 .../titan/diskstorage/hbase/HTable0_98.java     |  60 ++
 .../titan/diskstorage/hbase/HTable1_0.java      |  61 ++
 .../titan/diskstorage/hbase/TableMask.java      |  40 +
 .../titan/diskstorage/solr/Solr5Index.java      |  20 +-
 .../src/main/resources/application.properties   |   3 +
 28 files changed, 2374 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index a334e30..8357214 100755
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -248,3 +248,20 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS
 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 POSSIBILITY OF SUCH DAMAGE.
 
+-----------------------------------------------------------------------
+ Titan Apache 2.0 License
+-----------------------------------------------------------------------
+
+  Copyright 2012-2013 Aurelius LLC
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/distro/src/bin/atlas_config.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_config.py b/distro/src/bin/atlas_config.py
index f545a30..5326d5d 100755
--- a/distro/src/bin/atlas_config.py
+++ b/distro/src/bin/atlas_config.py
@@ -17,6 +17,7 @@
 # limitations under the License.
 import getpass
 import os
+import re
 import platform
 import subprocess
 from threading import Thread
@@ -31,7 +32,7 @@ CONF = "conf"
 LOG="logs"
 WEBAPP="server" + os.sep + "webapp"
 DATA="data"
-ENV_KEYS = ["JAVA_HOME", "METADATA_OPTS", "METADATA_LOG_DIR", "METADATA_PID_DIR", "METADATA_CONF", "METADATACPPATH", "METADATA_DATA_DIR", "METADATA_HOME_DIR", "METADATA_EXPANDED_WEBAPP_DIR"]
+ENV_KEYS = ["JAVA_HOME", "METADATA_OPTS", "METADATA_LOG_DIR", "METADATA_PID_DIR", "METADATA_CONF", "METADATACPPATH", "METADATA_DATA_DIR", "METADATA_HOME_DIR", "METADATA_EXPANDED_WEBAPP_DIR", "HBASE_CONF_DIR"]
 METADATA_CONF = "METADATA_CONF"
 METADATA_LOG = "METADATA_LOG_DIR"
 METADATA_PID = "METADATA_PID_DIR"
@@ -39,6 +40,7 @@ METADATA_WEBAPP = "METADATA_EXPANDED_WEBAPP_DIR"
 METADATA_OPTS = "METADATA_OPTS"
 METADATA_DATA = "METADATA_DATA_DIR"
 METADATA_HOME = "METADATA_HOME_DIR"
+HBASE_CONF_DIR = "HBASE_CONF_DIR"
 IS_WINDOWS = platform.system() == "Windows"
 ON_POSIX = 'posix' in sys.builtin_module_names
 DEBUG = False
@@ -60,6 +62,10 @@ def confDir(dir):
     localconf = os.path.join(dir, CONF)
     return os.environ.get(METADATA_CONF, localconf)
 
+def hbaseConfDir(atlasConfDir):
+    parentDir = os.path.dirname(atlasConfDir)
+    return os.environ.get(HBASE_CONF_DIR, os.path.join(parentDir, "hbase", CONF))
+
 def logDir(dir):
     localLog = os.path.join(dir, LOG)
     return os.environ.get(METADATA_LOG, localLog)
@@ -320,8 +326,14 @@ def win_exist_pid(pid):
     return False
 
 def server_already_running(pid):
-      print "Atlas server is already running under process %s" % pid
-      sys.exit()  
+    print "Atlas server is already running under process %s" % pid
+    sys.exit()  
     
 def server_pid_not_running(pid):
-      print "The Server is no longer running with pid %s" %pid
+    print "The Server is no longer running with pid %s" %pid
+
+def grep(file, value):
+    for line in open(file).readlines():
+        if re.match(value, line):	
+           return line
+    return None

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/distro/src/bin/atlas_start.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_start.py b/distro/src/bin/atlas_start.py
index 27a8365..08e9bdc 100755
--- a/distro/src/bin/atlas_start.py
+++ b/distro/src/bin/atlas_start.py
@@ -25,6 +25,8 @@ METADATA_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log"
 METADATA_COMMAND_OPTS="-Datlas.home=%s"
 METADATA_CONFIG_OPTS="-Datlas.conf=%s"
 DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml"
+CONF_FILE="application.properties"
+HBASE_STORAGE_CONF_ENTRY="atlas.graph.storage.backend\s*=\s*hbase"
 
 def main():
 
@@ -50,12 +52,21 @@ def main():
     web_app_dir = mc.webAppDir(metadata_home)
     mc.expandWebApp(metadata_home)
 
+    #add hbase-site.xml to classpath
+    hbase_conf_dir = mc.hbaseConfDir(confdir)
+
     p = os.pathsep
     metadata_classpath = confdir + p \
                        + os.path.join(web_app_dir, "atlas", "WEB-INF", "classes" ) + p \
                        + os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "*" )  + p \
                        + os.path.join(metadata_home, "libext", "*")
-
+    if os.path.exists(hbase_conf_dir):
+        metadata_classpath = metadata_classpath + p \
+                            + hbase_conf_dir
+    else: 
+       storage_backend = mc.grep(os.path.join(confdir, CONF_FILE), HBASE_STORAGE_CONF_ENTRY)
+       if storage_backend != None:
+	   raise Exception("Could not find hbase-site.xml in %s. Please set env var HBASE_CONF_DIR to the hbase client conf dir", hbase_conf_dir)
     
     metadata_pid_file = mc.pidFile(metadata_home)
     

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/distro/src/conf/application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/application.properties b/distro/src/conf/application.properties
index 5400149..dad8dea 100755
--- a/distro/src/conf/application.properties
+++ b/distro/src/conf/application.properties
@@ -25,7 +25,9 @@ atlas.graph.storage.directory=${sys:atlas.home}/data/berkley
 #hbase
 #For standalone mode , specify localhost
 #for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
-#atlas.graph.storage.hostname=localhost
+atlas.graph.storage.hostname=localhost
+atlas.graph.storage.hbase.regions-per-server=1
+atlas.graph.storage.lock.wait-time=10000
 
 #Solr
 #atlas.graph.index.search.backend=solr

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 7549808..63dd725 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -33,10 +33,19 @@ atlas.graph.storage.backend=hbase
 atlas.graph.storage.hostname=<ZooKeeper Quorum>
 </verbatim>
 
+HBASE_CONF_DIR environment variable needs to be set to point to the Hbase client configuration directory which is added to classpath when Atlas starts up.
+hbase-site.xml needs to have the following properties set according to the cluster setup
+<verbatim>
+#Set below to /hbase-secure if the Hbase server is setup in secure mode
+zookeeper.znode.parent=/hbase-unsecure
+</verbatim>
+
 Advanced configuration
 
+# If you are planning to use any of the configs mentioned below, they need to be prefixed with "atlas.graph." to take effect in ATLAS
 Refer http://s3.thinkaurelius.com/docs/titan/0.5.4/titan-config-ref.html#_storage_hbase
 
+
 ---++++ Graph Search Index
 This section sets up the graph db - titan - to use an search indexing system. The example
 configuration below setsup to use an embedded Elastic search indexing system.

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 1c31897..fa7b858 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -121,6 +121,16 @@ and change it to look as below
 export METADATA_SERVER_OPTS="-Djava.awt.headless=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc="
 </verbatim>
 
+* Hbase as the Storage Backend for the Graph Repository
+
+By default, Atlas uses Titan as the graph repository and is the only graph repository implementation available currently.
+The HBase versions currently supported are 0.98.x, 1.0.x, 1.1.x. For configuring ATLAS graph persistence on HBase, please go through the "Configuration - Graph persistence engine - HBase" section
+for more details.
+
+Pre-requisites for running HBase as a distributed cluster
+ * 3 or 5 ZooKeeper nodes
+ * Atleast 3 RegionServer nodes. It would be ideal to run the DataNodes on the same hosts as the Region servers for data locality.
+
 * Configuring SOLR as the Indexing Backend for the Graph Repository
 
 By default, Atlas uses Titan as the graph repository and is the only graph repository implementation available currently.
@@ -152,6 +162,13 @@ For configuring Titan to work with Solr, please follow the instructions below
 
 For more information on Titan solr configuration , please refer http://s3.thinkaurelius.com/docs/titan/0.5.4/solr.htm
 
+Pre-requisites for running Solr in cloud mode
+  * Memory - Solr is both memory and CPU intensive. Make sure the server running Solr has adequate memory, CPU and disk.
+    Solr works well with 32GB RAM. Plan to provide as much memory as possible to Solr process
+  * Disk - If the number of entities that need to be stored are large, plan to have at least 500 GB free space in the volume where Solr is going to store the index data
+  * SolrCloud has support for replication and sharding. It is highly recommended to use SolrCloud with at least two Solr nodes running on different servers with replication enabled.
+    If using SolrCloud, then you also need ZooKeeper installed and configured with 3 or 5 ZooKeeper nodes
+
 *Starting Atlas Server*
 <verbatim>
 bin/atlas_start.py [-port <port>]

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 38d2e5b..0028d31 100755
--- a/pom.xml
+++ b/pom.xml
@@ -328,7 +328,7 @@
         <tinkerpop.version>2.6.0</tinkerpop.version>
         <titan.version>0.5.4</titan.version>
         <hadoop.version>2.7.0</hadoop.version>
-        <hbase.version>0.98.9-hadoop2</hbase.version>
+        <hbase.version>1.1.2</hbase.version>
         <solr.version>5.1.0</solr.version>
         <kafka.version>0.8.2.0</kafka.version>
         <!-- scala versions -->
@@ -1512,6 +1512,7 @@
                         <exclude>**/build.log</exclude>
                         <exclude>.bowerrc</exclude>
                         <exclude>*.json</exclude>
+                        <exclude>**/overlays/**</exclude>
                     </excludes>
                 </configuration>
                 <executions>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index adeef31..05c7d5b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
 
 ALL CHANGES:
+ATLAS-114 Upgrade hbase client to 1.1.2 (sumasai)
 ATLAS-296 IllegalArgumentException during hive HiveHookIT integration tests (tbeerbower via shwethags)
 ATLAS-158 Provide Atlas Entity Change Notification (tbeerbower via shwethags)
 ATALS-238 atlas_start.py- the Atlas server won’t restart after improper shutdown(ndjouri via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index 3e2a6d1..28107e0 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -114,10 +114,11 @@
             <artifactId>titan-berkeleyje</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>com.thinkaurelius.titan</groupId>
-            <artifactId>titan-hbase</artifactId>
-        </dependency>
+        <!-- Commenting out since titan-hbase classes are shaded for 1.x support -->
+        <!--<dependency>-->
+            <!--<groupId>com.thinkaurelius.titan</groupId>-->
+            <!--<artifactId>titan-hbase</artifactId>-->
+        <!--</dependency>-->
 
         <dependency>
             <groupId>org.apache.hbase</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
new file mode 100644
index 0000000..e255f1b
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface AdminMask extends Closeable
+{
+
+    void clearTable(String tableName, long timestamp) throws IOException;
+
+    HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException;
+
+    boolean tableExists(String tableName) throws IOException;
+
+    void createTable(HTableDescriptor desc) throws IOException;
+
+    void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
+
+    /**
+     * Estimate the number of regionservers in the HBase cluster.
+     *
+     * This is usually implemented by calling
+     * {@link HBaseAdmin#getClusterStatus()} and then
+     * {@link ClusterStatus#getServers()} and finally {@code size()} on the
+     * returned server list.
+     *
+     * @return the number of servers in the cluster or -1 if it could not be determined
+     */
+    int getEstimatedRegionServerCount();
+
+    void disableTable(String tableName) throws IOException;
+
+    void enableTable(String tableName) throws IOException;
+
+    boolean isTableDisabled(String tableName) throws IOException;
+
+    void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
new file mode 100644
index 0000000..feb578b
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface ConnectionMask extends Closeable
+{
+
+    TableMask getTable(String name) throws IOException;
+
+    AdminMask getAdmin() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
new file mode 100644
index 0000000..0cd4795
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.thinkaurelius.titan.util.system.IOUtils;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+
+public class HBaseAdmin0_98 implements AdminMask
+{
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class);
+
+    private final HBaseAdmin adm;
+
+    public HBaseAdmin0_98(HBaseAdmin adm)
+    {
+        this.adm = adm;
+    }
+
+    @Override
+    public void clearTable(String tableName, long timestamp) throws IOException
+    {
+        if (!adm.tableExists(tableName)) {
+            log.debug("clearStorage() called before table {} was created, skipping.", tableName);
+            return;
+        }
+
+        // Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than
+        // disabling and deleting tables.
+        HTable table = null;
+
+        try {
+            table = new HTable(adm.getConfiguration(), tableName);
+
+            Scan scan = new Scan();
+            scan.setBatch(100);
+            scan.setCacheBlocks(false);
+            scan.setCaching(2000);
+            scan.setTimeRange(0, Long.MAX_VALUE);
+            scan.setMaxVersions(1);
+
+            ResultScanner scanner = null;
+
+            try {
+                scanner = table.getScanner(scan);
+
+                for (Result res : scanner) {
+                    Delete d = new Delete(res.getRow());
+
+                    d.setTimestamp(timestamp);
+                    table.delete(d);
+                }
+            } finally {
+                IOUtils.closeQuietly(scanner);
+            }
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException
+    {
+        return adm.getTableDescriptor(tableName.getBytes());
+    }
+
+    @Override
+    public boolean tableExists(String tableName) throws IOException
+    {
+        return adm.tableExists(tableName);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc) throws IOException
+    {
+        adm.createTable(desc);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
+    {
+        adm.createTable(desc, startKey, endKey, numRegions);
+    }
+
+    @Override
+    public int getEstimatedRegionServerCount()
+    {
+        int serverCount = -1;
+        try {
+            serverCount = adm.getClusterStatus().getServers().size();
+            log.debug("Read {} servers from HBase ClusterStatus", serverCount);
+        } catch (IOException e) {
+            log.debug("Unable to retrieve HBase cluster status", e);
+        }
+        return serverCount;
+    }
+
+    @Override
+    public void disableTable(String tableName) throws IOException
+    {
+        adm.disableTable(tableName);
+    }
+
+    @Override
+    public void enableTable(String tableName) throws IOException
+    {
+        adm.enableTable(tableName);
+    }
+
+    @Override
+    public boolean isTableDisabled(String tableName) throws IOException
+    {
+        return adm.isTableDisabled(tableName);
+    }
+
+    @Override
+    public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException
+    {
+        adm.addColumn(tableName, columnDescriptor);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        adm.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
new file mode 100644
index 0000000..7e8f72d
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+public class HBaseAdmin1_0 implements AdminMask
+{
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class);
+
+    private final Admin adm;
+
+    public HBaseAdmin1_0(HBaseAdmin adm)
+    {
+        this.adm = adm;
+    }
+    @Override
+    public void clearTable(String tableString, long timestamp) throws IOException
+    {
+        TableName tableName = TableName.valueOf(tableString);
+
+        if (!adm.tableExists(tableName)) {
+            log.debug("Attempted to clear table {} before it exists (noop)", tableString);
+            return;
+        }
+
+        if (!adm.isTableDisabled(tableName))
+            adm.disableTable(tableName);
+
+        if (!adm.isTableDisabled(tableName))
+            throw new RuntimeException("Unable to disable table " + tableName);
+
+        // This API call appears to both truncate and reenable the table.
+        log.info("Truncating table {}", tableName);
+        adm.truncateTable(tableName, true /* preserve splits */);
+
+        try {
+            adm.enableTable(tableName);
+        } catch (TableNotDisabledException e) {
+            // This triggers seemingly every time in testing with 1.0.2.
+            log.debug("Table automatically reenabled by truncation: {}", tableName, e);
+        }
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException
+    {
+        return adm.getTableDescriptor(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public boolean tableExists(String tableString) throws IOException
+    {
+        return adm.tableExists(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc) throws IOException
+    {
+        adm.createTable(desc);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
+    {
+        adm.createTable(desc, startKey, endKey, numRegions);
+    }
+
+    @Override
+    public int getEstimatedRegionServerCount()
+    {
+        int serverCount = -1;
+        try {
+            serverCount = adm.getClusterStatus().getServers().size();
+            log.debug("Read {} servers from HBase ClusterStatus", serverCount);
+        } catch (IOException e) {
+            log.debug("Unable to retrieve HBase cluster status", e);
+        }
+        return serverCount;
+    }
+
+    @Override
+    public void disableTable(String tableString) throws IOException
+    {
+        adm.disableTable(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void enableTable(String tableString) throws IOException
+    {
+        adm.enableTable(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public boolean isTableDisabled(String tableString) throws IOException
+    {
+        return adm.isTableDisabled(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException
+    {
+        adm.addColumn(TableName.valueOf(tableString), columnDescriptor);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        adm.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
new file mode 100644
index 0000000..3bc6c25
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Delete;
+
+public interface HBaseCompat {
+
+    /**
+     * Configure the compression scheme {@code algo} on a column family
+     * descriptor {@code cd}. The {@code algo} parameter is a string value
+     * corresponding to one of the values of HBase's Compression enum. The
+     * Compression enum has moved between packages as HBase has evolved, which
+     * is why this method has a String argument in the signature instead of the
+     * enum itself.
+     *
+     * @param cd
+     *            column family to configure
+     * @param algo
+     *            compression type to use
+     */
+    public void setCompression(HColumnDescriptor cd, String algo);
+
+    /**
+     * Create and return a HTableDescriptor instance with the given name. The
+     * constructors on this method have remained stable over HBase development
+     * so far, but the old HTableDescriptor(String) constructor & byte[] friends
+     * are now marked deprecated and may eventually be removed in favor of the
+     * HTableDescriptor(TableName) constructor. That constructor (and the
+     * TableName type) only exists in newer HBase versions. Hence this method.
+     *
+     * @param tableName
+     *            HBase table name
+     * @return a new table descriptor instance
+     */
+    public HTableDescriptor newTableDescriptor(String tableName);
+
+    ConnectionMask createConnection(Configuration conf) throws IOException;
+
+    void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc);
+
+    void setTimestamp(Delete d, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
new file mode 100644
index 0000000..2c0f3b4
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+public class HBaseCompat0_98 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection0_98(HConnectionManager.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
new file mode 100644
index 0000000..633e525
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+public class HBaseCompat1_0 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection1_0(ConnectionFactory.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
new file mode 100644
index 0000000..e5c3d31
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+import java.io.IOException;
+
+public class HBaseCompat1_1 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection1_0(ConnectionFactory.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
new file mode 100644
index 0000000..2c0d6fe
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseCompatLoader {
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
+
+    private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1";
+
+    private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1";
+
+    private static HBaseCompat cachedCompat;
+
+    public synchronized static HBaseCompat getCompat(String classOverride) {
+
+        if (null != cachedCompat) {
+            log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
+            return cachedCompat;
+        }
+
+        HBaseCompat compat;
+        String className = null;
+        String classNameSource = null;
+
+        if (null != classOverride) {
+            className = classOverride;
+            classNameSource = "from explicit configuration";
+        } else {
+            String hbaseVersion = VersionInfo.getVersion();
+            for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) {
+                if (hbaseVersion.startsWith(supportedVersion + ".")) {
+                    className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
+                    classNameSource = "supporting runtime HBase version " + hbaseVersion;
+                    break;
+                }
+            }
+            if (null == className) {
+                log.info("The HBase version {} is not explicitly supported by Titan.  " +
+                         "Loading Titan's compatibility layer for its most recent supported HBase version ({})",
+                        hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
+                className = DEFAULT_HBASE_CLASS_NAME;
+                classNameSource = " by default";
+            }
+        }
+
+        final String errTemplate = " when instantiating HBase compatibility class " + className;
+
+        try {
+            compat = (HBaseCompat)Class.forName(className).newInstance();
+            log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        } catch (InstantiationException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        }
+
+        return cachedCompat = compat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ebc4502a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
new file mode 100644
index 0000000..7783a43
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
@@ -0,0 +1,368 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
+import com.thinkaurelius.titan.util.system.IOUtils;
+
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Here are some areas that might need work:
+ * <p/>
+ * - batching? (consider HTable#batch, HTable#setAutoFlush(false)
+ * - tuning HTable#setWriteBufferSize (?)
+ * - writing a server-side filter to replace ColumnCountGetFilter, which drops
+ * all columns on the row where it reaches its limit.  This requires getSlice,
+ * currently, to impose its limit on the client side.  That obviously won't
+ * scale.
+ * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
+ * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
+ * <p/>
+ * There may be other problem areas.  These are just the ones of which I'm aware.
+ */
+public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
+
+    private final String tableName;
+    private final HBaseStoreManager storeManager;
+
+    // When using shortened CF names, columnFamily is the shortname and storeName is the longname
+    // When not using shortened CF names, they are the same
+    //private final String columnFamily;
+    private final String storeName;
+    // This is columnFamily.getBytes()
+    private final byte[] columnFamilyBytes;
+    private final HBaseGetter entryGetter;
+
+    private final ConnectionMask cnx;
+
+    HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName) {
+        this.storeManager = storeManager;
+        this.cnx = cnx;
+        this.tableName = tableName;
+        //this.columnFamily = columnFamily;
+        this.storeName = storeName;
+        this.columnFamilyBytes = columnFamily.getBytes();
+        this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
+    }
+
+    @Override
+    public void close() throws BackendException {
+    }
+
+    @Override
+    public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+        Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
+        return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
+    }
+
+    @Override
+    public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+        return getHelper(keys, getFilter(query));
+    }
+
+    @Override
+    public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+        Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
+        mutateMany(mutations, txh);
+    }
+
+    @Override
+    public void acquireLock(StaticBuffer key,
+                            StaticBuffer column,
+                            StaticBuffer expectedValue,
+                            StoreTransaction txh) throws BackendException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
+        return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
+                query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
+                new FilterList(FilterList.Operator.MUST_PASS_ALL),
+                query);
+    }
+
+    @Override
+    public String getName() {
+        return storeName;
+    }
+
+    @Override
+    public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
+        return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
+    }
+
+    public static Filter getFilter(SliceQuery query) {
+        byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
+        byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
+
+        Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
+
+        if (query.hasLimit()) {
+            filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+                    filter,
+                    new ColumnPaginationFilter(query.getLimit(), 0));
+        }
+
+        logger.debug("Generated HBase Filter {}", filter);
+
+        return filter;
+    }
+
+    private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException {
+        List<Get> requests = new ArrayList<Get>(keys.size());
+        {
+            for (StaticBuffer key : keys) {
+                Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
+                try {
+                    g.setTimeRange(0, Long.MAX_VALUE);
+                } catch (IOException e) {
+                    throw new PermanentBackendException(e);
+                }
+                requests.add(g);
+            }
+        }
+
+        Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size());
+
+        try {
+            TableMask table = null;
+            Result[] results = null;
+
+            try {
+                table = cnx.getTable(tableName);
+                results = table.get(requests);
+            } finally {
+                IOUtils.closeQuietly(table);
+            }
+
+            if (results == null)
+                return KCVSUtil.emptyResults(keys);
+
+            assert results.length==keys.size();
+
+            for (int i = 0; i < results.length; i++) {
+                Result result = results[i];
+                NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap();
+
+                if (f == null) { // no result for this key
+                    resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
+                    continue;
+                }
+
+                // actual key with <timestamp, value>
+                NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes);
+                resultMap.put(keys.get(i), (r == null)
+                                            ? EntryList.EMPTY_LIST
+                                            : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
+            }
+
+            return resultMap;
+        } catch (IOException e) {
+            throw new TemporaryBackendException(e);
+        }
+    }
+
+    private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
+        storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
+    }
+
+    private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
+        return executeKeySliceQuery(null, null, filters, columnSlice);
+    }
+
+    private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
+                                            @Nullable byte[] endKey,
+                                            FilterList filters,
+                                            @Nullable SliceQuery columnSlice) throws BackendException {
+        Scan scan = new Scan().addFamily(columnFamilyBytes);
+
+        try {
+            scan.setTimeRange(0, Long.MAX_VALUE);
+        } catch (IOException e) {
+            throw new PermanentBackendException(e);
+        }
+
+        if (startKey != null)
+            scan.setStartRow(startKey);
+
+        if (endKey != null)
+            scan.setStopRow(endKey);
+
+        if (columnSlice != null) {
+            filters.addFilter(getFilter(columnSlice));
+        }
+
+        TableMask table = null;
+
+        try {
+            table = cnx.getTable(tableName);
+            return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
+        } catch (IOException e) {
+            IOUtils.closeQuietly(table);
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private class RowIterator implements KeyIterator {
+        private final Closeable table;
+        private final Iterator<Result> rows;
+        private final byte[] columnFamilyBytes;
+
+        private Result currentRow;
+        private boolean isClosed;
+
+        public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
+            this.table = table;
+            this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
+            this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() {
+                @Override
+                public boolean apply(@Nullable Result result) {
+                    if (result == null)
+                        return false;
+
+                    try {
+                        StaticBuffer id = StaticArrayBuffer.of(result.getRow());
+                        id.getLong(0);
+                    } catch (NumberFormatException e) {
+                        return false;
+                    }
+
+                    return true;
+                }
+            });
+        }
+
+        @Override
+        public RecordIterator<Entry> getEntries() {
+            ensureOpen();
+
+            return new RecordIterator<Entry>() {
+                private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentRow.getMap().get(columnFamilyBytes).entrySet().iterator();
+
+                @Override
+                public boolean hasNext() {
+                    ensureOpen();
+                    return kv.hasNext();
+                }
+
+                @Override
+                public Entry next() {
+                    ensureOpen();
+                    return StaticArrayEntry.ofBytes(kv.next(), entryGetter);
+                }
+
+                @Override
+                public void close() {
+                    isClosed = true;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        @Override
+        public boolean hasNext() {
+            ensureOpen();
+            return rows.hasNext();
+        }
+
+        @Override
+        public StaticBuffer next() {
+            ensureOpen();
+
+            currentRow = rows.next();
+            return StaticArrayBuffer.of(currentRow.getRow());
+        }
+
+        @Override
+        public void close() {
+            IOUtils.closeQuietly(table);
+            isClosed = true;
+            logger.debug("RowIterator closed table {}", table);
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        private void ensureOpen() {
+            if (isClosed)
+                throw new IllegalStateException("Iterator has been closed.");
+        }
+    }
+
+    private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> {
+
+        private final EntryMetaData[] schema;
+
+        private HBaseGetter(EntryMetaData[] schema) {
+            this.schema = schema;
+        }
+
+        @Override
+        public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return element.getKey();
+        }
+
+        @Override
+        public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return element.getValue().lastEntry().getValue();
+        }
+
+        @Override
+        public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return schema;
+        }
+
+        @Override
+        public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) {
+            switch(meta) {
+                case TIMESTAMP:
+                    return element.getValue().lastEntry().getKey();
+                default:
+                    throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+            }
+        }
+    }
+}


Mime
View raw message