atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dkan...@apache.org
Subject [8/9] incubator-atlas git commit: ATLAS-693 Titan 0.5.4 implementation of graph db abstraction. (jnhagelb via dkantor)
Date Fri, 05 Aug 2016 23:27:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/pom.xml
----------------------------------------------------------------------
diff --git a/graphdb/titan0/pom.xml b/graphdb/titan0/pom.xml
new file mode 100644
index 0000000..f2dc9a8
--- /dev/null
+++ b/graphdb/titan0/pom.xml
@@ -0,0 +1,257 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one
+~ or more contributor license agreements.  See the NOTICE file
+~ distributed with this work for additional information
+~ regarding copyright ownership.  The ASF licenses this file
+~ to you under the Apache License, Version 2.0 (the
+~ "License"); you may not use this file except in compliance
+~ with the License.  You may obtain a copy of the License at
+~
+~     http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>atlas-graphdb</artifactId>
+        <groupId>org.apache.atlas</groupId>
+        <version>0.8-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>atlas-graphdb-titan0</artifactId>
+    <description>Apache Atlas Titan 0.5.4 Graph DB Impl</description>
+    <name>Apache Atlas Titan 0.5.4 Graph DB Impl</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <tinkerpop.version>2.6.0</tinkerpop.version>
+        <titan.version>0.5.4</titan.version>
+    </properties>
+
+    <dependencies>
+
+        <!-- for graphdb interface definitions -->
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-graphdb-api</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+       <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-graphdb-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.inject</groupId>
+            <artifactId>guice</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-core</artifactId>
+            <version>${titan.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.tinkerpop.blueprints</groupId>
+            <artifactId>blueprints-core</artifactId>
+            <version>${tinkerpop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.tinkerpop.gremlin</groupId>
+            <artifactId>gremlin-java</artifactId>
+            <version>${tinkerpop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.vividsolutions</groupId>
+            <artifactId>jts</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-solrj</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-es</artifactId>
+            <version>${titan.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-berkeleyje</artifactId>
+            <version>${titan.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-lucene</artifactId>
+            <version>${titan.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!--
+                 Create 'uber' jar that contains all of the dependencies (except those whose scope is provided)
+                 Only Titan 0l5l4 and its dependencies are included.  The other dependencies are bundled in the war file.
+            -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.4.3</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <shadedArtifactAttached>false</shadedArtifactAttached>
+                            <artifactSet>
+                                <excludes>
+                                    <!-- these are bundled with Atlas -->
+                                    <exclude>org.slf4j:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>com.thinkaurelius.titan:titan-core</artifact>
+                                    <!-- force use of our custom LocalLockMediator implementation -->
+                                    <excludes>
+                                        <exclude>com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator*</exclude>
+                                    </excludes>
+                                </filter>
+
+                            </filters>
+                            <createSourcesJar>true</createSourcesJar>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <excludes>
+                        <exclude>**/log4j.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencyManagement>
+        <dependencies>
+            <!-- Graph DB -->
+            <dependency>
+                <groupId>com.tinkerpop.blueprints</groupId>
+                <artifactId>blueprints-core</artifactId>
+                <version>${tinkerpop.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>com.thinkaurelius.titan</groupId>
+                <artifactId>titan-core</artifactId>
+                <version>${titan.version}</version>
+                <exclusions>
+                    <!-- rexster does not work with servlet-api -->
+                    <exclusion>
+                        <groupId>com.tinkerpop.rexster</groupId>
+                        <artifactId>rexster-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.tinkerpop.rexster</groupId>
+                        <artifactId>rexster-server</artifactId>
+                    </exclusion>
+                    <!-- asm 4.0 does not work with jersey asm 3.1 -->
+                    <exclusion>
+                        <groupId>com.tinkerpop</groupId>
+                        <artifactId>frames</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.esotericsoftware.reflectasm</groupId>
+                        <artifactId>reflectasm</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.ow2.asm</groupId>
+                        <artifactId>asm</artifactId>
+                    </exclusion>
+                    <exclusion> <!-- GPL license imported from ganglia -->
+                        <groupId>org.acplt</groupId>
+                        <artifactId>oncrpc</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>com.thinkaurelius.titan</groupId>
+                <artifactId>titan-berkeleyje</artifactId>
+                <version>${titan.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>com.thinkaurelius.titan</groupId>
+                <artifactId>titan-hbase</artifactId>
+                <version>${titan.version}</version>
+            </dependency>
+
+        </dependencies>
+    </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
new file mode 100644
index 0000000..e255f1b
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface AdminMask extends Closeable
+{
+
+    void clearTable(String tableName, long timestamp) throws IOException;
+
+    HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException;
+
+    boolean tableExists(String tableName) throws IOException;
+
+    void createTable(HTableDescriptor desc) throws IOException;
+
+    void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
+
+    /**
+     * Estimate the number of regionservers in the HBase cluster.
+     *
+     * This is usually implemented by calling
+     * {@link HBaseAdmin#getClusterStatus()} and then
+     * {@link ClusterStatus#getServers()} and finally {@code size()} on the
+     * returned server list.
+     *
+     * @return the number of servers in the cluster or -1 if it could not be determined
+     */
+    int getEstimatedRegionServerCount();
+
+    void disableTable(String tableName) throws IOException;
+
+    void enableTable(String tableName) throws IOException;
+
+    boolean isTableDisabled(String tableName) throws IOException;
+
+    void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
new file mode 100644
index 0000000..feb578b
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface ConnectionMask extends Closeable
+{
+
+    TableMask getTable(String name) throws IOException;
+
+    AdminMask getAdmin() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
new file mode 100644
index 0000000..0cd4795
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.thinkaurelius.titan.util.system.IOUtils;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+
+public class HBaseAdmin0_98 implements AdminMask
+{
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class);
+
+    private final HBaseAdmin adm;
+
+    public HBaseAdmin0_98(HBaseAdmin adm)
+    {
+        this.adm = adm;
+    }
+
+    @Override
+    public void clearTable(String tableName, long timestamp) throws IOException
+    {
+        if (!adm.tableExists(tableName)) {
+            log.debug("clearStorage() called before table {} was created, skipping.", tableName);
+            return;
+        }
+
+        // Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than
+        // disabling and deleting tables.
+        HTable table = null;
+
+        try {
+            table = new HTable(adm.getConfiguration(), tableName);
+
+            Scan scan = new Scan();
+            scan.setBatch(100);
+            scan.setCacheBlocks(false);
+            scan.setCaching(2000);
+            scan.setTimeRange(0, Long.MAX_VALUE);
+            scan.setMaxVersions(1);
+
+            ResultScanner scanner = null;
+
+            try {
+                scanner = table.getScanner(scan);
+
+                for (Result res : scanner) {
+                    Delete d = new Delete(res.getRow());
+
+                    d.setTimestamp(timestamp);
+                    table.delete(d);
+                }
+            } finally {
+                IOUtils.closeQuietly(scanner);
+            }
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException
+    {
+        return adm.getTableDescriptor(tableName.getBytes());
+    }
+
+    @Override
+    public boolean tableExists(String tableName) throws IOException
+    {
+        return adm.tableExists(tableName);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc) throws IOException
+    {
+        adm.createTable(desc);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
+    {
+        adm.createTable(desc, startKey, endKey, numRegions);
+    }
+
+    @Override
+    public int getEstimatedRegionServerCount()
+    {
+        int serverCount = -1;
+        try {
+            serverCount = adm.getClusterStatus().getServers().size();
+            log.debug("Read {} servers from HBase ClusterStatus", serverCount);
+        } catch (IOException e) {
+            log.debug("Unable to retrieve HBase cluster status", e);
+        }
+        return serverCount;
+    }
+
+    @Override
+    public void disableTable(String tableName) throws IOException
+    {
+        adm.disableTable(tableName);
+    }
+
+    @Override
+    public void enableTable(String tableName) throws IOException
+    {
+        adm.enableTable(tableName);
+    }
+
+    @Override
+    public boolean isTableDisabled(String tableName) throws IOException
+    {
+        return adm.isTableDisabled(tableName);
+    }
+
+    @Override
+    public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException
+    {
+        adm.addColumn(tableName, columnDescriptor);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        adm.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
new file mode 100644
index 0000000..7e8f72d
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+public class HBaseAdmin1_0 implements AdminMask
+{
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class);
+
+    private final Admin adm;
+
+    public HBaseAdmin1_0(HBaseAdmin adm)
+    {
+        this.adm = adm;
+    }
+    @Override
+    public void clearTable(String tableString, long timestamp) throws IOException
+    {
+        TableName tableName = TableName.valueOf(tableString);
+
+        if (!adm.tableExists(tableName)) {
+            log.debug("Attempted to clear table {} before it exists (noop)", tableString);
+            return;
+        }
+
+        if (!adm.isTableDisabled(tableName))
+            adm.disableTable(tableName);
+
+        if (!adm.isTableDisabled(tableName))
+            throw new RuntimeException("Unable to disable table " + tableName);
+
+        // This API call appears to both truncate and reenable the table.
+        log.info("Truncating table {}", tableName);
+        adm.truncateTable(tableName, true /* preserve splits */);
+
+        try {
+            adm.enableTable(tableName);
+        } catch (TableNotDisabledException e) {
+            // This triggers seemingly every time in testing with 1.0.2.
+            log.debug("Table automatically reenabled by truncation: {}", tableName, e);
+        }
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException
+    {
+        return adm.getTableDescriptor(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public boolean tableExists(String tableString) throws IOException
+    {
+        return adm.tableExists(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc) throws IOException
+    {
+        adm.createTable(desc);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
+    {
+        adm.createTable(desc, startKey, endKey, numRegions);
+    }
+
+    @Override
+    public int getEstimatedRegionServerCount()
+    {
+        int serverCount = -1;
+        try {
+            serverCount = adm.getClusterStatus().getServers().size();
+            log.debug("Read {} servers from HBase ClusterStatus", serverCount);
+        } catch (IOException e) {
+            log.debug("Unable to retrieve HBase cluster status", e);
+        }
+        return serverCount;
+    }
+
+    @Override
+    public void disableTable(String tableString) throws IOException
+    {
+        adm.disableTable(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void enableTable(String tableString) throws IOException
+    {
+        adm.enableTable(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public boolean isTableDisabled(String tableString) throws IOException
+    {
+        return adm.isTableDisabled(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException
+    {
+        adm.addColumn(TableName.valueOf(tableString), columnDescriptor);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        adm.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
new file mode 100644
index 0000000..c9b03aa
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+
+public interface HBaseCompat {
+
+    /**
+     * Configure the compression scheme {@code algo} on a column family
+     * descriptor {@code cd}. The {@code algo} parameter is a string value
+     * corresponding to one of the values of HBase's Compression enum. The
+     * Compression enum has moved between packages as HBase has evolved, which
+     * is why this method has a String argument in the signature instead of the
+     * enum itself.
+     *
+     * @param cd
+     *            column family to configure
+     * @param algo
+     *            compression type to use
+     */
+    public void setCompression(HColumnDescriptor cd, String algo);
+
+    /**
+     * Create and return a HTableDescriptor instance with the given name. The
+     * constructors on this method have remained stable over HBase development
+     * so far, but the old HTableDescriptor(String) constructor & byte[] friends
+     * are now marked deprecated and may eventually be removed in favor of the
+     * HTableDescriptor(TableName) constructor. That constructor (and the
+     * TableName type) only exists in newer HBase versions. Hence this method.
+     *
+     * @param tableName
+     *            HBase table name
+     * @return a new table descriptor instance
+     */
+    public HTableDescriptor newTableDescriptor(String tableName);
+
+    ConnectionMask createConnection(Configuration conf) throws IOException;
+
+    void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc);
+
+    void setTimestamp(Delete d, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
new file mode 100644
index 0000000..2c0f3b4
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+public class HBaseCompat0_98 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection0_98(HConnectionManager.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
new file mode 100644
index 0000000..bb3fb3b
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+public class HBaseCompat1_0 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection1_0(ConnectionFactory.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
new file mode 100644
index 0000000..e5c3d31
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+import java.io.IOException;
+
+public class HBaseCompat1_1 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection1_0(ConnectionFactory.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
new file mode 100644
index 0000000..2c0d6fe
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseCompatLoader {
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
+
+    private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1";
+
+    private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1";
+
+    private static HBaseCompat cachedCompat;
+
+    public synchronized static HBaseCompat getCompat(String classOverride) {
+
+        if (null != cachedCompat) {
+            log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
+            return cachedCompat;
+        }
+
+        HBaseCompat compat;
+        String className = null;
+        String classNameSource = null;
+
+        if (null != classOverride) {
+            className = classOverride;
+            classNameSource = "from explicit configuration";
+        } else {
+            String hbaseVersion = VersionInfo.getVersion();
+            for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) {
+                if (hbaseVersion.startsWith(supportedVersion + ".")) {
+                    className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
+                    classNameSource = "supporting runtime HBase version " + hbaseVersion;
+                    break;
+                }
+            }
+            if (null == className) {
+                log.info("The HBase version {} is not explicitly supported by Titan.  " +
+                         "Loading Titan's compatibility layer for its most recent supported HBase version ({})",
+                        hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
+                className = DEFAULT_HBASE_CLASS_NAME;
+                classNameSource = " by default";
+            }
+        }
+
+        final String errTemplate = " when instantiating HBase compatibility class " + className;
+
+        try {
+            compat = (HBaseCompat)Class.forName(className).newInstance();
+            log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        } catch (InstantiationException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        }
+
+        return cachedCompat = compat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
----------------------------------------------------------------------
diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
new file mode 100644
index 0000000..c5f6e0d
--- /dev/null
+++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.core.attribute.Duration;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
+import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
+import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.util.system.IOUtils;
+
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Here are some areas that might need work:
+ * <p/>
+ * - batching? (consider HTable#batch, HTable#setAutoFlush(false)
+ * - tuning HTable#setWriteBufferSize (?)
+ * - writing a server-side filter to replace ColumnCountGetFilter, which drops
+ * all columns on the row where it reaches its limit.  This requires getSlice,
+ * currently, to impose its limit on the client side.  That obviously won't
+ * scale.
+ * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
+ * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
+ * <p/>
+ * There may be other problem areas.  These are just the ones of which I'm aware.
+ */
+public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
+
+    private final String tableName;
+    private final HBaseStoreManager storeManager;
+
+    // When using shortened CF names, columnFamily is the shortname and storeName is the longname
+    // When not using shortened CF names, they are the same
+    //private final String columnFamily;
+    private final String storeName;
+    // This is columnFamily.getBytes()
+    private final byte[] columnFamilyBytes;
+    private final HBaseGetter entryGetter;
+
+    private final ConnectionMask cnx;
+
+    private LocalLockMediator<StoreTransaction> localLockMediator;
+
+    private final Duration lockExpiryTimeMs;
+    private final Duration lockMaxWaitTimeMs;
+    private final Integer lockMaxRetries;
+
+    HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) {
+        this.storeManager = storeManager;
+        this.cnx = cnx;
+        this.tableName = tableName;
+        //this.columnFamily = columnFamily;
+        this.storeName = storeName;
+        this.columnFamilyBytes = columnFamily.getBytes();
+        this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
+        this.localLockMediator = llm;
+        Configuration storageConfig = storeManager.getStorageConfig();
+        this.lockExpiryTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE);
+        this.lockMaxWaitTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT);
+        this.lockMaxRetries = storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY);
+    }
+
+    @Override
+    public void close() throws BackendException {
+    }
+
+    @Override
+    public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+        Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
+        return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
+    }
+
+    @Override
+    public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+        return getHelper(keys, getFilter(query));
+    }
+
+    @Override
+    public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+        Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
+        mutateMany(mutations, txh);
+    }
+
+    @Override
+    public void acquireLock(StaticBuffer key,
+                            StaticBuffer column,
+                            StaticBuffer expectedValue,
+                            StoreTransaction txh) throws BackendException {
+
+        KeyColumn lockID = new KeyColumn(key, column);
+        logger.debug("Attempting to acquireLock on {} ", lockID);
+        int trialCount = 0;
+        boolean locked;
+        while (trialCount < lockMaxRetries) {
+            final Timepoint lockStartTime = Timestamps.MILLI.getTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+            locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTimeMs));
+            trialCount++;
+            if (!locked) {
+                handleLockFailure(txh, lockID, trialCount);
+            } else {
+                logger.debug("Acquired lock on {}, {}", lockID, txh);
+                break;
+            }
+        }
+        ((HBaseTransaction) txh).updateLocks(lockID, expectedValue);
+    }
+
+    void handleLockFailure(StoreTransaction txh, KeyColumn lockID, int trialCount) throws PermanentLockingException {
+        if (trialCount < lockMaxRetries) {
+            try {
+                Thread.sleep(lockMaxWaitTimeMs.getLength(TimeUnit.DAYS.MILLISECONDS));
+            } catch (InterruptedException e) {
+                throw new PermanentLockingException(
+                        "Interrupted while waiting for acquiring lock for transaction "
+                        + txh + " lockID " + lockID + " on retry " + trialCount, e);
+            }
+        } else {
+            throw new PermanentLockingException("Could not lock the keyColumn " +
+                    lockID + " on CF {} " + Bytes.toString(columnFamilyBytes));
+        }
+    }
+
+    @Override
+    public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
+        return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
+                query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
+                new FilterList(FilterList.Operator.MUST_PASS_ALL),
+                query);
+    }
+
+    @Override
+    public String getName() {
+        return storeName;
+    }
+
+    @Override
+    public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
+        return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
+    }
+
+    public static Filter getFilter(SliceQuery query) {
+        byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
+        byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
+
+        Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
+
+        if (query.hasLimit()) {
+            filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+                    filter,
+                    new ColumnPaginationFilter(query.getLimit(), 0));
+        }
+
+        logger.debug("Generated HBase Filter {}", filter);
+
+        return filter;
+    }
+
+    private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException {
+        List<Get> requests = new ArrayList<Get>(keys.size());
+        {
+            for (StaticBuffer key : keys) {
+                Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
+                try {
+                    g.setTimeRange(0, Long.MAX_VALUE);
+                } catch (IOException e) {
+                    throw new PermanentBackendException(e);
+                }
+                requests.add(g);
+            }
+        }
+
+        Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size());
+
+        try {
+            TableMask table = null;
+            Result[] results = null;
+
+            try {
+                table = cnx.getTable(tableName);
+                logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
+                results = table.get(requests);
+                logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
+            } finally {
+                IOUtils.closeQuietly(table);
+            }
+
+            if (results == null)
+                return KCVSUtil.emptyResults(keys);
+
+            assert results.length==keys.size();
+
+            for (int i = 0; i < results.length; i++) {
+                Result result = results[i];
+                NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap();
+
+                if (f == null) { // no result for this key
+                    resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
+                    continue;
+                }
+
+                // actual key with <timestamp, value>
+                NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes);
+                resultMap.put(keys.get(i), (r == null)
+                                            ? EntryList.EMPTY_LIST
+                                            : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
+            }
+
+            return resultMap;
+        } catch (IOException e) {
+            throw new TemporaryBackendException(e);
+        }
+    }
+
+    private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
+        storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
+    }
+
+    private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
+        return executeKeySliceQuery(null, null, filters, columnSlice);
+    }
+
+    private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
+                                            @Nullable byte[] endKey,
+                                            FilterList filters,
+                                            @Nullable SliceQuery columnSlice) throws BackendException {
+        Scan scan = new Scan().addFamily(columnFamilyBytes);
+
+        try {
+            scan.setTimeRange(0, Long.MAX_VALUE);
+        } catch (IOException e) {
+            throw new PermanentBackendException(e);
+        }
+
+        if (startKey != null)
+            scan.setStartRow(startKey);
+
+        if (endKey != null)
+            scan.setStopRow(endKey);
+
+        if (columnSlice != null) {
+            filters.addFilter(getFilter(columnSlice));
+        }
+
+        TableMask table = null;
+
+        logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey));
+        try {
+            table = cnx.getTable(tableName);
+            return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
+        } catch (IOException e) {
+            IOUtils.closeQuietly(table);
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private class RowIterator implements KeyIterator {
+        private final Closeable table;
+        private final Iterator<Result> rows;
+        private final byte[] columnFamilyBytes;
+
+        private Result currentRow;
+        private boolean isClosed;
+
+        public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
+            this.table = table;
+            this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
+            this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() {
+                @Override
+                public boolean apply(@Nullable Result result) {
+                    if (result == null)
+                        return false;
+
+                    try {
+                        StaticBuffer id = StaticArrayBuffer.of(result.getRow());
+                        id.getLong(0);
+                    } catch (NumberFormatException e) {
+                        return false;
+                    }
+
+                    return true;
+                }
+            });
+        }
+
+        @Override
+        public RecordIterator<Entry> getEntries() {
+            ensureOpen();
+
+            return new RecordIterator<Entry>() {
+                private final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> currentMap = currentRow.getMap();
+                private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentMap == null ? null : currentMap.get(columnFamilyBytes).entrySet().iterator();
+
+                @Override
+                public boolean hasNext() {
+                    ensureOpen();
+                    return kv == null ? false : kv.hasNext();
+                }
+
+                @Override
+                public Entry next() {
+                    ensureOpen();
+                    return kv == null ? null : StaticArrayEntry.ofBytes(kv.next(), entryGetter);
+                }
+
+                @Override
+                public void close() {
+                    isClosed = true;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        @Override
+        public boolean hasNext() {
+            ensureOpen();
+            return rows.hasNext();
+        }
+
+        @Override
+        public StaticBuffer next() {
+            ensureOpen();
+
+            currentRow = rows.next();
+            return StaticArrayBuffer.of(currentRow.getRow());
+        }
+
+        @Override
+        public void close() {
+            IOUtils.closeQuietly(table);
+            isClosed = true;
+            logger.debug("RowIterator closed table {}", table);
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        private void ensureOpen() {
+            if (isClosed)
+                throw new IllegalStateException("Iterator has been closed.");
+        }
+    }
+
+    private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> {
+
+        private final EntryMetaData[] schema;
+
+        private HBaseGetter(EntryMetaData[] schema) {
+            this.schema = schema;
+        }
+
+        @Override
+        public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return element.getKey();
+        }
+
+        @Override
+        public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return element.getValue().lastEntry().getValue();
+        }
+
+        @Override
+        public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return schema;
+        }
+
+        @Override
+        public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) {
+            switch(meta) {
+                case TIMESTAMP:
+                    return element.getValue().lastEntry().getKey();
+                default:
+                    throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+            }
+        }
+    }
+}


Mime
View raw message