gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmarroq...@apache.org
Subject [1/4] gora git commit: GORA-443
Date Mon, 30 Jan 2017 15:11:35 GMT
Repository: gora
Updated Branches:
  refs/heads/GORA-443 [created] 592c4a95d


GORA-443


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/01856b56
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/01856b56
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/01856b56

Branch: refs/heads/GORA-443
Commit: 01856b565be8b6e890130dedc3bed89f63c96daa
Parents: 4192c87
Author: Renato Marroquin <marenato@inf.ethz.ch>
Authored: Wed Aug 31 13:26:51 2016 +0200
Committer: Renato Marroquin <marenato@inf.ethz.ch>
Committed: Wed Aug 31 13:26:51 2016 +0200

----------------------------------------------------------------------
 .../apache/gora/store/DataStoreTestUtil.java    |   5 +-
 .../org/apache/gora/hbase/store/HBaseStore.java |  47 ++-
 .../gora/hbase/store/HBaseTableConnection.java  | 351 ++++++-------------
 gora-hbase/src/test/conf/hbase-site.xml         |   4 +
 .../apache/gora/hbase/store/TestHBaseStore.java |  31 +-
 5 files changed, 148 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
----------------------------------------------------------------------
diff --git a/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java b/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
index 551b90a..0b4fed4 100644
--- a/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
+++ b/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
@@ -1079,12 +1079,13 @@ public class DataStoreTestUtil {
     store.deleteByQuery(query);
     store.deleteByQuery(query);//don't you love that HBase sometimes does not delete arbitrarily
     
-    store.flush();
-
     assertNumResults(store.newQuery(), URLS.length);
 
+
+
     //assert that data is deleted
     for (int i = 0; i < URLS.length; i++) {
+      store.flush();
       WebPage page = store.get(URLS[i]);
       assertNotNull(page);
       if( URLS[i].compareTo(startKey) < 0 || URLS[i].compareTo(endKey) >= 0) {

http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
index 00fe60b..51f33d0 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
@@ -57,13 +57,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.jdom.Document;
@@ -88,7 +82,7 @@ implements Configurable {
   private static final String SCANNER_CACHING_PROPERTIES_KEY = "scanner.caching" ;
   private static final int SCANNER_CACHING_PROPERTIES_DEFAULT = 0 ;
   
-  private volatile HBaseAdmin admin;
+  private volatile Admin admin;
 
   private volatile HBaseTableConnection table;
 
@@ -110,10 +104,10 @@ implements Configurable {
   public void initialize(Class<K> keyClass, Class<T> persistentClass,
       Properties properties) {
     try {
-      
       super.initialize(keyClass, persistentClass, properties);
+
       this.conf = HBaseConfiguration.create(getConf());
-      admin = new HBaseAdmin(this.conf);
+      admin = ConnectionFactory.createConnection(getConf()).getAdmin();
       mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
       filterUtil = new HBaseFilterUtil<>(this.conf);
     } catch (FileNotFoundException ex) {
@@ -175,8 +169,8 @@ implements Configurable {
       if(!schemaExists()) {
         return;
       }
-      admin.disableTable(getSchemaName());
-      admin.deleteTable(getSchemaName());
+      admin.disableTable(mapping.getTable().getTableName());
+      admin.deleteTable(mapping.getTable().getTableName());
     } catch(IOException ex2){
       LOG.error(ex2.getMessage(), ex2);
     }
@@ -185,7 +179,7 @@ implements Configurable {
   @Override
   public boolean schemaExists() {
     try{
-      return admin.tableExists(mapping.getTableName());
+      return admin.tableExists(mapping.getTable().getTableName());
     } catch(IOException ex2){
       LOG.error(ex2.getMessage(), ex2);
       return false;
@@ -241,13 +235,14 @@ implements Configurable {
         addPutsAndDeletes(put, delete, o, field.schema().getType(),
             field.schema(), hcol, hcol.getQualifier());
       }
-      if (put.size() > 0) {
-        table.put(put);
-      }
+
       if (delete.size() > 0) {
         table.delete(delete);
-        table.delete(delete);
-        table.delete(delete); // HBase sometimes does not delete arbitrarily
+//        table.delete(delete);
+//        table.delete(delete); // HBase sometimes does not delete arbitrarily
+      }
+      if (put.size() > 0) {
+        table.put(put);
       }
     } catch (IOException ex2) {
       LOG.error(ex2.getMessage(), ex2);
@@ -260,16 +255,18 @@ implements Configurable {
     case UNION:
       if (isNullable(schema) && o == null) {
         if (qualifier == null) {
-          delete.deleteFamily(hcol.getFamily());
+//          delete.deleteFamily(hcol.getFamily());
+          delete.addFamily(hcol.getFamily());
         } else {
-          delete.deleteColumn(hcol.getFamily(), qualifier);
+//          delete.deleteColumn(hcol.getFamily(), qualifier);
+          delete.addColumn(hcol.getFamily(), qualifier);
         }
       } else {
 //        int index = GenericData.get().resolveUnion(schema, o);
         int index = getResolvedUnionIndex(schema);
         if (index > 1) {  //if more than 2 type in union, serialize directly for now
           byte[] serializedBytes = toBytes(o, schema);
-          put.add(hcol.getFamily(), qualifier, serializedBytes);
+          put.addColumn(hcol.getFamily(), qualifier, serializedBytes);
         } else {
           Schema resolvedSchema = schema.getTypes().get(index);
           addPutsAndDeletes(put, delete, o, resolvedSchema.getType(),
@@ -281,9 +278,11 @@ implements Configurable {
       // if it's a map that has been modified, then the content should be replaced by the
new one
       // This is because we don't know if the content has changed or not.
       if (qualifier == null) {
-        delete.deleteFamily(hcol.getFamily());
+        //delete.deleteFamily(hcol.getFamily());
+        delete.addFamily(hcol.getFamily());
       } else {
-        delete.deleteColumn(hcol.getFamily(), qualifier);
+        //delete.deleteColumn(hcol.getFamily(), qualifier);
+        delete.addColumn(hcol.getFamily(), qualifier);
       }
       @SuppressWarnings({ "rawtypes", "unchecked" })
       Set<Entry> set = ((Map) o).entrySet();
@@ -303,7 +302,7 @@ implements Configurable {
       break;
     default:
       byte[] serializedBytes = toBytes(o, schema);
-      put.add(hcol.getFamily(), qualifier, serializedBytes);
+      put.addColumn(hcol.getFamily(), qualifier, serializedBytes);
       break;
     }
   }

http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
index 6803827..000a8b5 100644
--- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
+++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java
@@ -18,63 +18,67 @@
 package org.apache.gora.hbase.store;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.util.Pair;
 
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-
 /**
  * Thread safe implementation to connect to a HBase table.
  *
  */
-public class HBaseTableConnection implements HTableInterface {
+public class HBaseTableConnection {
   /*
    * The current implementation uses ThreadLocal HTable instances. It keeps
    * track of the floating instances in order to correctly flush and close
    * the connection when it is closed. HBase itself provides a utility called
-   * HTablePool for maintaining a pool of tables, but there are still some
+   * HTablePool for maintaining a tPool of tables, but there are still some
    * drawbacks that are only solved in later releases.
-   * 
+   *
    */
-  
+
   private final Configuration conf;
-  private final ThreadLocal<HTable> tables;
-  private final BlockingQueue<HTable> pool = new LinkedBlockingQueue<>();
+  private final Connection connection;
+  private final RegionLocator regionLocator;
+  // BufferedMutator used for doing async flush i.e. autoflush = false
+  private final ThreadLocal<ConcurrentLinkedQueue<Mutation>> buffers;
+  private final ThreadLocal<Table> tables;
+
+  private final BlockingQueue<Table> tPool = new LinkedBlockingQueue<>();
+  private final BlockingQueue<ConcurrentLinkedQueue<Mutation>> bPool = new LinkedBlockingQueue<>();
   private final boolean autoFlush;
   private final TableName tableName;
-  
+
+//  public class MutationPair {
+//    private Mutation mutation;
+//    private boolean type;
+//
+//    public void MutationPair(Mutation m, boolean t) {
+//      this.mutation = m;
+//      this.type = t;
+//    }
+//
+//    public boolean isType() {
+//      return type;
+//    }
+//
+//    public Mutation getMutation() {
+//      return mutation;
+//    }
+//  }
   /**
    * Instantiate new connection.
-   * 
+   *
    * @param conf
    * @param tableName
    * @param autoflush
@@ -83,288 +87,129 @@ public class HBaseTableConnection implements HTableInterface {
   public HBaseTableConnection(Configuration conf, String tableName, boolean autoflush)
       throws IOException {
     this.conf = conf;
+
     this.tables = new ThreadLocal<>();
+    this.buffers = new ThreadLocal<>();
+    this.connection = ConnectionFactory.createConnection(conf);
     this.tableName = TableName.valueOf(tableName);
+    this.regionLocator = this.connection.getRegionLocator(this.tableName);
+
     this.autoFlush = autoflush;
   }
-  
-  private HTable getTable() throws IOException {
-    HTable table = tables.get();
+
+  private Table getTable() throws IOException {
+    Table table = tables.get();
     if (table == null) {
-      table = new HTable(conf, tableName) {
-        @Override
-        public synchronized void flushCommits() throws RetriesExhaustedWithDetailsException,
InterruptedIOException {
-          super.flushCommits();
-        }
-      };
-      table.setAutoFlushTo(autoFlush);
-      pool.add(table); //keep track
+      table = connection.getTable(tableName);
+//      table.setAutoFlushTo(autoFlush);
+      tPool.add(table); //keep track
       tables.set(table);
     }
     return table;
   }
-  
-  @Override
+
+  private ConcurrentLinkedQueue<Mutation> getBuffer() throws IOException {
+    ConcurrentLinkedQueue<Mutation> buffer = buffers.get();
+    if (buffer == null) {
+//      BufferedMutatorParams params = new BufferedMutatorParams(this.tableName).listener(listener);
+//      buffer = connection.getBufferedMutator(this.tableName);
+      buffer = new ConcurrentLinkedQueue<>();
+      bPool.add(buffer);
+      buffers.set(buffer);
+    }
+    return buffer;
+  }
+
+  public void flushCommits() throws IOException {
+    BufferedMutator bufMutator = connection.getBufferedMutator(this.tableName);
+    for (ConcurrentLinkedQueue<Mutation> buffer : bPool) {
+      for (Mutation m: buffer) {
+        bufMutator.mutate(m);
+        bufMutator.flush();
+      }
+    }
+    bufMutator.close();
+  }
+
   public void close() throws IOException {
     // Flush and close all instances.
     // (As an extra safeguard one might employ a shared variable i.e. 'closed'
     //  in order to prevent further table creation but for now we assume that
     //  once close() is called, clients are no longer using it).
-    for (HTable table : pool) {
-      table.flushCommits();
+    flushCommits();
+
+    for (Table table : tPool) {
       table.close();
     }
   }
 
-  @Override
-  public byte[] getTableName() {
-    return tableName.getName();
-  }
-
-  @Override
   public Configuration getConfiguration() {
     return conf;
   }
 
-  @Override
-  public boolean isAutoFlush() {
-    return autoFlush;
-  }
-
   /**
-   * getStartEndKeys provided by {@link HTable} but not {@link HTableInterface}.
-   * @see HTable#getStartEndKeys()
+   * getStartEndKeys provided by {@link HRegionLocation}.
+   * @see RegionLocator#getStartEndKeys()
    */
   public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
-    return getTable().getStartEndKeys();
+    return regionLocator.getStartEndKeys();
   }
   /**
-   * getRegionLocation provided by {@link HTable} but not 
-   * {@link HTableInterface}.
-   * @see HTable#getRegionLocation(byte[])
+   * getRegionLocation provided by {@link HRegionLocation}
+   * @see RegionLocator#getRegionLocation(byte[])
    */
   public HRegionLocation getRegionLocation(final byte[] bs) throws IOException {
-    return getTable().getRegionLocation(bs);
-  }
-
-  @Override
-  public HTableDescriptor getTableDescriptor() throws IOException {
-    return getTable().getTableDescriptor();
+    return regionLocator.getRegionLocation(bs);
   }
 
-  @Override
   public boolean exists(Get get) throws IOException {
     return getTable().exists(get);
   }
 
-  @Override
+  public boolean[] existsAll(List<Get> list) throws IOException {
+    return getTable().existsAll(list);
+  }
+
   public Result get(Get get) throws IOException {
     return getTable().get(get);
   }
 
-  @Override
   public Result[] get(List<Get> gets) throws IOException {
     return getTable().get(gets);
   }
 
-  @Override
-  public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
-    return getTable().getRowOrBefore(row, family);
-  }
-
-  @Override
   public ResultScanner getScanner(Scan scan) throws IOException {
     return getTable().getScanner(scan);
   }
 
-  @Override
-  public ResultScanner getScanner(byte[] family) throws IOException {
-    return getTable().getScanner(family);
-  }
-
-  @Override
-  public ResultScanner getScanner(byte[] family, byte[] qualifier)
-      throws IOException {
-    return getTable().getScanner(family, qualifier);
-  }
-
-  @Override
   public void put(Put put) throws IOException {
-    getTable().put(put);
+    getBuffer().add(put);
+//    getBuffer().flush();
+//    getTable().put(put);
   }
 
-  @Override
+//  @Override
   public void put(List<Put> puts) throws IOException {
-    getTable().put(puts);
-  }
-
-  @Override
-  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-      byte[] value, Put put) throws IOException {
-    return getTable().checkAndPut(row, family, qualifier, value, put);
+//    getTable().put(puts);
+    getBuffer().addAll(puts);
+//    getBuffer().flush();
   }
 
-  @Override
+//  @Override
   public void delete(Delete delete) throws IOException {
-    getTable().delete(delete);
+    getBuffer().add(delete);
+//    getBuffer().flush();
+//    getTable().delete(delete);
   }
 
-  @Override
+//  @Override
   public void delete(List<Delete> deletes) throws IOException {
-    getTable().delete(deletes);
-    
-  }
-
-  @Override
-  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-      byte[] value, Delete delete) throws IOException {
-    return getTable().checkAndDelete(row, family, qualifier, value, delete);
-  }
-
-  @Override
-  public Result increment(Increment increment) throws IOException {
-    return getTable().increment(increment);
-  }
-
-  @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long amount) throws IOException {
-    return getTable().incrementColumnValue(row, family, qualifier, amount);
-  }
-
-  @Deprecated
-  @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long amount, boolean writeToWAL) throws IOException {
-    return getTable().incrementColumnValue(row, family, qualifier, amount,
-        writeToWAL);
-  }
-
-  @Override
-  public void flushCommits() throws IOException {
-    for (HTable table : pool) {
-      table.flushCommits();
-    }
-  }
-
-  @Override
-  public void batch(List<? extends Row> actions, Object[] results)
-      throws IOException, InterruptedException {
-    getTable().batch(actions, results);
-    
-  }
-
-  @Override
-  public void mutateRow(RowMutations rm) throws IOException {
-    getTable().mutateRow(rm);    
-  }
-
-  @Override
-  public Result append(Append append) throws IOException {
-    return getTable().append(append);
-  }
-
-  @Override
-  public void setAutoFlush(boolean autoFlush) {
-    // TODO Auto-generated method stub
-    
+//    getTable().delete(deletes);
+    getBuffer().addAll(deletes);
+//    getBuffer().flush();
   }
 
-  @Override
-  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail){
-    // TODO Auto-generated method stub
-  }
-
-  @Override
-  public long getWriteBufferSize() {
-    // TODO Auto-generated method stub
-    return 0;
-  }
-
-  @Override
-  public void setWriteBufferSize(long writeBufferSize) throws IOException {
-    // TODO Auto-generated method stub
-  }
-
-  @Override
   public TableName getName() {
     return tableName;
   }
-
-  @Override
-  public Boolean[] exists(List<Get> gets) throws IOException {
-    return getTable().exists(gets);
-  }
-
-  @Override
-  public <R> void
-      batchCallback(List<? extends Row> actions, Object[] results, Callback<R>
callback)
-          throws IOException, InterruptedException {
-    getTable().batchCallback(actions, results, callback);
-    
-  }
-
-  @Deprecated
-  @Override
-  public <R> Object[] batchCallback(List<? extends Row> actions, Callback<R>
callback)
-      throws IOException, InterruptedException {
-    return getTable().batchCallback(actions, callback);
-  }
-
-  @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
-      Durability durability) throws IOException {
-    return getTable().incrementColumnValue(row, family, qualifier, amount,durability);
-  }
-
-  @Override
-  public CoprocessorRpcChannel coprocessorService(byte[] row) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T>
service,
-      byte[] startKey, byte[] endKey, Call<T, R> callable) throws Throwable {
-    return getTable().coprocessorService(service, startKey, endKey, callable);
-  }
-
-  @Override
-  public <T extends Service, R> void coprocessorService(Class<T> service, byte[]
startKey,
-      byte[] endKey, Call<T, R> callable, Callback<R> callback) throws Throwable
{
-    getTable().coprocessorService(service, startKey, endKey, callable, callback);
-  }
-
-  @Override
-  public void setAutoFlushTo(boolean autoFlush) {
-    // TODO Auto-generated method stub    
-  }
-
-  @Override
-  public <R extends Message> Map<byte[], R> batchCoprocessorService(
-      MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
-      R responsePrototype) throws Throwable {
-    return getTable().batchCoprocessorService(methodDescriptor, request, startKey, endKey,
responsePrototype);
-  }
-
-  @Override
-  public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
-      Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R>
callback)
-      throws Throwable {
-    getTable().batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
callback);
-    
-  }
-
-  @Deprecated
-  @Override
-  public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException
{
-    return getTable().batch(actions);
-  }
-
-  @Override
-  public boolean checkAndMutate(byte[] arg0, byte[] arg1, byte[] arg2, CompareOp arg3, byte[]
arg4,
-      RowMutations arg5) throws IOException {
-    // TODO Auto-generated method stub
-    return false;
-  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-hbase/src/test/conf/hbase-site.xml
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/conf/hbase-site.xml b/gora-hbase/src/test/conf/hbase-site.xml
index 51e1346..0b506e2 100644
--- a/gora-hbase/src/test/conf/hbase-site.xml
+++ b/gora-hbase/src/test/conf/hbase-site.xml
@@ -138,4 +138,8 @@
     <value>localhost</value>
     <description>The directory shared by region servers.</description>
   </property>
+  <property>
+    <name>zookeeper.session.timeout</name>
+    <value>60000</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java b/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
index 41be132..87101ed 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java
@@ -26,9 +26,8 @@ import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.DataStoreTestBase;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -87,8 +86,10 @@ public class TestHBaseStore extends DataStoreTestBase {
   }
 
   @Override
-  public void assertPutArray() throws IOException { 
-    HTable table = new HTable(conf,"WebPage");
+  public void assertPutArray() throws IOException {
+    Connection conn = ConnectionFactory.createConnection(conf);
+    TableName webPageTab = TableName.valueOf("WebPage");
+    Table table = conn.getTable(webPageTab);
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
     
@@ -100,8 +101,7 @@ public class TestHBaseStore extends DataStoreTestBase {
         ,Bytes.toBytes(3)), Bytes.toBytes("example.com")));
     table.close();
   }
-  
-  
+
   /**
    * Asserts that writing bytes actually works at low level in HBase.
    * Checks writing null unions too.
@@ -110,7 +110,10 @@ public class TestHBaseStore extends DataStoreTestBase {
   public void assertPutBytes(byte[] contentBytes) throws IOException {    
 
     // Check first the parameter "contentBytes" if written+read right.
-    HTable table = new HTable(conf,"WebPage");
+    Connection conn = ConnectionFactory.createConnection(conf);
+    TableName webPageTab = TableName.valueOf("WebPage");
+    Table table = conn.getTable(webPageTab);
+
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
     
@@ -131,7 +134,7 @@ public class TestHBaseStore extends DataStoreTestBase {
     page = webPageStore.get("com.example/http") ;
     assertNull(page.getContent()) ;
     // Check directly with HBase
-    table = new HTable(conf,"WebPage");
+    table = conn.getTable(webPageTab);
     get = new Get(Bytes.toBytes("com.example/http"));
     result = table.get(get);
     actualBytes = result.getValue(Bytes.toBytes("content"), null);
@@ -148,6 +151,8 @@ public class TestHBaseStore extends DataStoreTestBase {
     page = webPageStore.get("com.example/http") ;
     assertTrue(Arrays.equals("".getBytes(Charset.defaultCharset()),page.getContent().array()))
;
     // Check directly with HBase
+
+
     table = new HTable(conf,"WebPage");
     get = new Get(Bytes.toBytes("com.example/http"));
     result = table.get(get);
@@ -206,7 +211,9 @@ public class TestHBaseStore extends DataStoreTestBase {
     webPageStore.flush() ;
     
     // Read directly from HBase
-    HTable table = new HTable(conf,"WebPage");
+    Connection conn = ConnectionFactory.createConnection(conf);
+    TableName webPageTab = TableName.valueOf("WebPage");
+    Table table = conn.getTable(webPageTab);
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
     table.close();
@@ -218,7 +225,9 @@ public class TestHBaseStore extends DataStoreTestBase {
   
   @Override
   public void assertPutMap() throws IOException {
-    HTable table = new HTable(conf,"WebPage");
+    Connection conn = ConnectionFactory.createConnection(conf);
+    TableName webPageTab = TableName.valueOf("WebPage");
+    Table table = conn.getTable(webPageTab);
     Get get = new Get(Bytes.toBytes("com.example/http"));
     org.apache.hadoop.hbase.client.Result result = table.get(get);
     


Mime
View raw message