incubator-gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject svn commit: r1203184 - in /incubator/gora/trunk: ./ gora-hbase/src/main/java/org/apache/gora/hbase/store/
Date Thu, 17 Nov 2011 13:37:36 GMT
Author: lewismc
Date: Thu Nov 17 13:37:35 2011
New Revision: 1203184

URL: http://svn.apache.org/viewvc?rev=1203184&view=rev
Log:
commit to address GORA-56, update to changes.txt and thank you to Ferdy

Added:
    incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
Modified:
    incubator/gora/trunk/CHANGES.txt
    incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
    incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java
    incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java

Modified: incubator/gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/CHANGES.txt?rev=1203184&r1=1203183&r2=1203184&view=diff
==============================================================================
--- incubator/gora/trunk/CHANGES.txt (original)
+++ incubator/gora/trunk/CHANGES.txt Thu Nov 17 13:37:35 2011
@@ -2,6 +2,8 @@ Gora Change Log
 
 Trunk (unreleased changes):
 
+* GORA-56 HBaseStore is not thread safe. (Ferdy via lewismc)
+
 * GORA-48. HBaseStore initialization of table without configuration in constructor will throw
Exception (Ferdy via lewismc)
 
 * GORA-47&46. fix tar ant target & Add nightly target to build.xml respectively (lewismc)

Modified: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java?rev=1203184&r1=1203183&r2=1203184&view=diff
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
(original)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
Thu Nov 17 13:37:35 2011
@@ -24,24 +24,31 @@ import java.util.Arrays;
  */
 class HBaseColumn {
   
-  String tableName;
-  byte[] family;
-  byte[] qualifier;
+  final String tableName;
+  final byte[] family;
+  final byte[] qualifier;
   
   public HBaseColumn(String tableName, byte[] family, byte[] qualifier) {
     this.tableName = tableName;
-    this.family = family;
-    this.qualifier = qualifier;
+    this.family = family==null ? null : Arrays.copyOf(family, family.length);
+    this.qualifier = qualifier==null ? null : 
+      Arrays.copyOf(qualifier, qualifier.length);
   }
 
   public String getTableName() {
     return tableName;
   }
   
+  /**
+   * @return the family (internal array returned; do not modify)
+   */
   public byte[] getFamily() {
     return family;
   }
 
+  /**
+   * @return the qualifer (internal array returned; do not modify)
+   */
   public byte[] getQualifier() {
     return qualifier;
   }

Modified: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java?rev=1203184&r1=1203183&r2=1203184&view=diff
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java
(original)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java
Thu Nov 17 13:37:35 2011
@@ -28,37 +28,30 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Mapping definitions for HBase
+ * Mapping definitions for HBase. Thread safe.
  */
 public class HBaseMapping {
 
-  private Map<String, HTableDescriptor> tableDescriptors 
-    = new HashMap<String, HTableDescriptor>();
+  private final Map<String, HTableDescriptor> tableDescriptors;
   
   //name of the primary table
-  private String tableName; 
+  private final String tableName; 
   
   // a map from field name to hbase column
-  private Map<String, HBaseColumn> columnMap = 
-    new HashMap<String, HBaseColumn>();
+  private final Map<String, HBaseColumn> columnMap;
   
-  public HBaseMapping() {
-  }
-  
-  public void setTableName(String tableName) {
+  public HBaseMapping(Map<String, HTableDescriptor> tableDescriptors,
+      String tableName, Map<String, HBaseColumn> columnMap) {
+    super();
+    this.tableDescriptors = tableDescriptors;
     this.tableName = tableName;
+    this.columnMap = columnMap;
   }
-  
+
   public String getTableName() {
     return tableName;
   }
   
-  public void addTable(String tableName) {
-    if(!tableDescriptors.containsKey(tableName)) {
-      tableDescriptors.put(tableName, new HTableDescriptor(tableName));
-    }
-  }
-  
   public HTableDescriptor getTable() {
     return getTable(tableName);
   }
@@ -67,49 +60,94 @@ public class HBaseMapping {
     return tableDescriptors.get(tableName);
   }
   
-  public void addColumnFamily(String tableName, String familyName
-      , String compression, String blockCache, String blockSize, String bloomFilter
-      , String maxVersions, String timeToLive, String inMemory) {
-    
-    HColumnDescriptor columnDescriptor = addColumnFamily(tableName, familyName);
-    
-    if(compression != null)
-      columnDescriptor.setCompressionType(Algorithm.valueOf(compression));
-    if(blockCache != null)
-      columnDescriptor.setBlockCacheEnabled(Boolean.parseBoolean(blockCache));
-    if(blockSize != null)
-      columnDescriptor.setBlocksize(Integer.parseInt(blockSize));
-    if(bloomFilter != null)
-      columnDescriptor.setBloomFilterType(BloomType.valueOf(bloomFilter));
-    if(maxVersions != null)
-      columnDescriptor.setMaxVersions(Integer.parseInt(maxVersions));
-    if(timeToLive != null)
-      columnDescriptor.setTimeToLive(Integer.parseInt(timeToLive));
-    if(inMemory != null)
-      columnDescriptor.setInMemory(Boolean.parseBoolean(inMemory));
-    
-    getTable(tableName).addFamily(columnDescriptor);
-  }
-  
-  public HColumnDescriptor addColumnFamily(String tableName, String familyName) {
-    HTableDescriptor tableDescriptor = getTable(tableName);
-    HColumnDescriptor columnDescriptor =  tableDescriptor.getFamily(Bytes.toBytes(familyName));
-    if(columnDescriptor == null) {
-      columnDescriptor = new HColumnDescriptor(familyName);
-      tableDescriptor.addFamily(columnDescriptor);
-    }
-    return columnDescriptor;
-  }
-  
-  public void addField(String fieldName, String tableName, String family, String qualifier)
{
-    byte[] familyBytes = Bytes.toBytes(family);
-    byte[] qualifierBytes = qualifier == null ? null : Bytes.toBytes(qualifier);
-    
-    HBaseColumn column = new HBaseColumn(tableName, familyBytes, qualifierBytes);
-    columnMap.put(fieldName, column);
-  }
- 
   public HBaseColumn getColumn(String fieldName) {
     return columnMap.get(fieldName);
   }
+  
+  /**
+   * A builder for creating the mapper. This will allow building a thread safe
+   * {@link HBaseMapping} using simple immutabilty.
+   *
+   */
+  public static class HBaseMappingBuilder {
+    private Map<String, HTableDescriptor> tableDescriptors 
+      = new HashMap<String, HTableDescriptor>();
+    private String tableName; 
+    private Map<String, HBaseColumn> columnMap = 
+      new HashMap<String, HBaseColumn>();
+    
+    public void addTable(String tableName) {
+      if(!tableDescriptors.containsKey(tableName)) {
+        tableDescriptors.put(tableName, new HTableDescriptor(tableName));
+      }
+    }
+    
+    public String getTableName() {
+      return tableName;
+    }
+    
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+    
+    public void addColumnFamily(String tableName, String familyName,
+        String compression, String blockCache, String blockSize,
+        String bloomFilter ,String maxVersions, String timeToLive, 
+        String inMemory) {
+      
+      HColumnDescriptor columnDescriptor = addColumnFamily(tableName, 
+          familyName);
+      
+      if(compression != null)
+        columnDescriptor.setCompressionType(Algorithm.valueOf(compression));
+      if(blockCache != null)
+        columnDescriptor.setBlockCacheEnabled(Boolean.parseBoolean(blockCache));
+      if(blockSize != null)
+        columnDescriptor.setBlocksize(Integer.parseInt(blockSize));
+      if(bloomFilter != null)
+        columnDescriptor.setBloomFilterType(BloomType.valueOf(bloomFilter));
+      if(maxVersions != null)
+        columnDescriptor.setMaxVersions(Integer.parseInt(maxVersions));
+      if(timeToLive != null)
+        columnDescriptor.setTimeToLive(Integer.parseInt(timeToLive));
+      if(inMemory != null)
+        columnDescriptor.setInMemory(Boolean.parseBoolean(inMemory));
+      
+      getTable(tableName).addFamily(columnDescriptor);
+    }
+    
+    public HTableDescriptor getTable(String tableName) {
+      return tableDescriptors.get(tableName);
+    }
+    
+    public HColumnDescriptor addColumnFamily(String tableName, 
+        String familyName) {
+      HTableDescriptor tableDescriptor = getTable(tableName);
+      HColumnDescriptor columnDescriptor =  tableDescriptor.getFamily(
+          Bytes.toBytes(familyName));
+      if(columnDescriptor == null) {
+        columnDescriptor = new HColumnDescriptor(familyName);
+        tableDescriptor.addFamily(columnDescriptor);
+      }
+      return columnDescriptor;
+    }
+    
+    public void addField(String fieldName, String tableName, String family, 
+        String qualifier) {
+      byte[] familyBytes = Bytes.toBytes(family);
+      byte[] qualifierBytes = qualifier == null ? null : 
+        Bytes.toBytes(qualifier);
+      
+      HBaseColumn column = new HBaseColumn(tableName, familyBytes, 
+          qualifierBytes);
+      columnMap.put(fieldName, column);
+    }
+    
+    /**
+     * @return A newly constructed mapping.
+     */
+    public HBaseMapping build() {
+      return new HBaseMapping(tableDescriptors, tableName, columnMap);
+    }
+  }
 }
\ No newline at end of file

Modified: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1203184&r1=1203183&r2=1203184&view=diff
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
(original)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
Thu Nov 17 13:37:35 2011
@@ -43,6 +43,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.gora.hbase.query.HBaseGetResult;
 import org.apache.gora.hbase.query.HBaseQuery;
 import org.apache.gora.hbase.query.HBaseScannerResult;
+import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
 import org.apache.gora.hbase.util.HBaseByteInterface;
 import org.apache.gora.persistency.ListGenericArray;
 import org.apache.gora.persistency.Persistent;
@@ -62,7 +63,6 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -74,8 +74,7 @@ import org.jdom.Element;
 import org.jdom.input.SAXBuilder;
 
 /**
- * DataStore for HBase.
- * <p> Note: HBaseStore is not yet thread-safe.
+ * DataStore for HBase. Thread safe.
  *
  */
 public class HBaseStore<K, T extends Persistent> extends DataStoreBase<K, T>
@@ -89,15 +88,15 @@ implements Configurable {
   private static final String DEPRECATED_MAPPING_FILE = "hbase-mapping.xml";
   public static final String DEFAULT_MAPPING_FILE = "gora-hbase-mapping.xml";
 
-  private HBaseAdmin admin;
+  private volatile HBaseAdmin admin;
 
-  private HTable table;
+  private volatile HBaseTableConnection table;
 
-  private Configuration conf;
+  private volatile Configuration conf;
 
-  private boolean autoCreateSchema = true;
+  private final boolean autoCreateSchema = true;
 
-  private HBaseMapping mapping;
+  private volatile HBaseMapping mapping;
 
   public HBaseStore()  {
   }
@@ -131,7 +130,7 @@ implements Configurable {
       createSchema();
     }
 
-    table = new HTable(conf, mapping.getTableName());
+    table = new HBaseTableConnection(getConf(), mapping.getTableName(), true);
   }
 
   @Override
@@ -152,9 +151,6 @@ implements Configurable {
   @Override
   public void deleteSchema() throws IOException {
     if(!admin.tableExists(mapping.getTableName())) {
-      if(table != null) {
-        table.getWriteBuffer().clear();
-      }
       return;
     }
     admin.disableTable(mapping.getTableName());
@@ -521,7 +517,7 @@ implements Configurable {
   @SuppressWarnings("unchecked")
   private HBaseMapping readMapping(String filename) throws IOException {
 
-    HBaseMapping mapping = new HBaseMapping();
+    HBaseMappingBuilder mappingBuilder = new HBaseMappingBuilder();
 
     try {
       SAXBuilder builder = new SAXBuilder();
@@ -532,7 +528,7 @@ implements Configurable {
       List<Element> tableElements = root.getChildren("table");
       for(Element tableElement : tableElements) {
         String tableName = tableElement.getAttributeValue("name");
-        mapping.addTable(tableName);
+        mappingBuilder.addTable(tableName);
 
         List<Element> fieldElements = tableElement.getChildren("field");
         for(Element fieldElement : fieldElements) {
@@ -545,28 +541,33 @@ implements Configurable {
           String timeToLive  = fieldElement.getAttributeValue("timeToLive");
           String inMemory    = fieldElement.getAttributeValue("inMemory");
 
-          mapping.addColumnFamily(tableName, familyName, compression, blockCache, blockSize
-              , bloomFilter, maxVersions, timeToLive, inMemory);
+          mappingBuilder.addColumnFamily(tableName, familyName, compression, 
+              blockCache, blockSize, bloomFilter, maxVersions, timeToLive, 
+              inMemory);
         }
       }
 
       List<Element> classElements = root.getChildren("class");
       for(Element classElement: classElements) {
-        if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
+        if(classElement.getAttributeValue("keyClass").equals(
+            keyClass.getCanonicalName())
             && classElement.getAttributeValue("name").equals(
                 persistentClass.getCanonicalName())) {
 
-          String tableName = getSchemaName(classElement.getAttributeValue("table"), persistentClass);
-          mapping.addTable(tableName);
-          mapping.setTableName(tableName);
+          String tableName = getSchemaName(
+              classElement.getAttributeValue("table"), persistentClass);
+          mappingBuilder.addTable(tableName);
+          mappingBuilder.setTableName(tableName);
 
           List<Element> fields = classElement.getChildren("field");
           for(Element field:fields) {
             String fieldName =  field.getAttributeValue("name");
             String family =  field.getAttributeValue("family");
             String qualifier = field.getAttributeValue("qualifier");
-            mapping.addField(fieldName, mapping.getTableName(), family, qualifier);
-            mapping.addColumnFamily(mapping.getTableName(), family);//implicit family definition
+            mappingBuilder.addField(fieldName, mappingBuilder.getTableName(), 
+                family, qualifier);
+            mappingBuilder.addColumnFamily(mappingBuilder.getTableName(), 
+                family);//implicit family definition
           }
 
           break;
@@ -578,14 +579,12 @@ implements Configurable {
       throw new IOException(ex);
     }
 
-    return mapping;
+    return mappingBuilder.build();
   }
 
   @Override
   public void close() throws IOException {
-    flush();
-    if(table != null)
-      table.close();
+    table.close();
   }
 
   @Override

Added: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java?rev=1203184&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
(added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
Thu Nov 17 13:37:35 2011
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.hbase.store;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Thread safe implementation to connect to a HBase table.
+ *
+ */
+public class HBaseTableConnection implements HTableInterface{
+  /*
+   * The current implementation uses ThreadLocal HTable instances. It keeps
+   * track of the floating instances in order to correctly flush and close
+   * the connection when it is closed. HBase itself provides a utility called
+   * HTablePool for maintaining a pool of tables, but there are still some
+   * drawbacks that are only solved in later releases.
+   * 
+   */
+  
+  private final Configuration conf;
+  private final ThreadLocal<HTable> tables;
+  private final BlockingQueue<HTable> pool = new LinkedBlockingQueue<HTable>();
+  private final boolean autoflush;
+  private final String tableName;
+  
+  /**
+   * Instantiate new connection.
+   * 
+   * @param conf
+   * @param tableName
+   * @param autoflush
+   * @throws IOException
+   */
+  public HBaseTableConnection(Configuration conf, String tableName, 
+      boolean autoflush) throws IOException {
+    this.conf = conf;
+    this.tables = new ThreadLocal<HTable>();
+    this.tableName = tableName;
+    this.autoflush = autoflush;
+  }
+  
+  private HTable getTable() throws IOException {
+    HTable table = tables.get();
+    if (table == null) {
+      table = new HTable(conf, tableName);
+      table.setAutoFlush(autoflush);
+      pool.add(table); //keep track
+      tables.set(table);
+    }
+    return table;
+  }
+  
+  @Override
+  public void close() throws IOException {
+    // Flush and close all instances.
+    // (As an extra safeguard one might employ a shared variable i.e. 'closed'
+    //  in order to prevent further table creation but for now we assume that
+    //  once close() is called, clients are no longer using it).
+    for (HTable table : pool) {
+      table.flushCommits();
+      table.close();
+    }
+  }
+
+  @Override
+  public byte[] getTableName() {
+    return Bytes.toBytes(tableName);
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public boolean isAutoFlush() {
+    return autoflush;
+  }
+
+  /**
+   * getStartEndKeys provided by {@link HTable} but not {@link HTableInterface}.
+   * @see HTable#getStartEndKeys()
+   */
+  public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
+    return getTable().getStartEndKeys();
+  }
+  /**
+   * getRegionLocation provided by {@link HTable} but not 
+   * {@link HTableInterface}.
+   * @see HTable#getRegionLocation(byte[])
+   */
+  public HRegionLocation getRegionLocation(final byte[] bs) throws IOException {
+    return getTable().getRegionLocation(bs);
+  }
+
+  @Override
+  public HTableDescriptor getTableDescriptor() throws IOException {
+    return getTable().getTableDescriptor();
+  }
+
+  @Override
+  public boolean exists(Get get) throws IOException {
+    return getTable().exists(get);
+  }
+
+  @Override
+  public void batch(List<Row> actions, Object[] results) throws IOException,
+      InterruptedException {
+    getTable().batch(actions, results);
+  }
+
+  @Override
+  public Object[] batch(List<Row> actions) throws IOException,
+      InterruptedException {
+    return getTable().batch(actions);
+  }
+
+  @Override
+  public Result get(Get get) throws IOException {
+    return getTable().get(get);
+  }
+
+  @Override
+  public Result[] get(List<Get> gets) throws IOException {
+    return getTable().get(gets);
+  }
+
+  @Override
+  public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+    return getTable().getRowOrBefore(row, family);
+  }
+
+  @Override
+  public ResultScanner getScanner(Scan scan) throws IOException {
+    return getTable().getScanner(scan);
+  }
+
+  @Override
+  public ResultScanner getScanner(byte[] family) throws IOException {
+    return getTable().getScanner(family);
+  }
+
+  @Override
+  public ResultScanner getScanner(byte[] family, byte[] qualifier)
+      throws IOException {
+    return getTable().getScanner(family, qualifier);
+  }
+
+  @Override
+  public void put(Put put) throws IOException {
+    getTable().put(put);
+  }
+
+  @Override
+  public void put(List<Put> puts) throws IOException {
+    getTable().put(puts);
+  }
+
+  @Override
+  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, Put put) throws IOException {
+    return getTable().checkAndPut(row, family, qualifier, value, put);
+  }
+
+  @Override
+  public void delete(Delete delete) throws IOException {
+    getTable().delete(delete);
+  }
+
+  @Override
+  public void delete(List<Delete> deletes) throws IOException {
+    getTable().delete(deletes);
+    
+  }
+
+  @Override
+  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, Delete delete) throws IOException {
+    return getTable().checkAndDelete(row, family, qualifier, value, delete);
+  }
+
+  @Override
+  public Result increment(Increment increment) throws IOException {
+    return getTable().increment(increment);
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long amount) throws IOException {
+    return getTable().incrementColumnValue(row, family, qualifier, amount);
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long amount, boolean writeToWAL) throws IOException {
+    return getTable().incrementColumnValue(row, family, qualifier, amount,
+        writeToWAL);
+  }
+
+  @Override
+  public void flushCommits() throws IOException {
+    getTable().flushCommits();
+  }
+
+  @Override
+  public RowLock lockRow(byte[] row) throws IOException {
+    return getTable().lockRow(row);
+  }
+
+  @Override
+  public void unlockRow(RowLock rl) throws IOException {
+    getTable().unlockRow(rl);
+  }
+}



Mime
View raw message