metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject incubator-metron git commit: METRON-418 Set TTL on HBase Puts (nickwallen) closes apache/incubator-metron#251
Date Thu, 15 Sep 2016 19:09:11 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 728133b91 -> 1fbbf7648


METRON-418 Set TTL on HBase Puts (nickwallen) closes apache/incubator-metron#251


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/1fbbf764
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/1fbbf764
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/1fbbf764

Branch: refs/heads/master
Commit: 1fbbf7648651efd8c8f3c944ee86b9aed4e0e279
Parents: 728133b
Author: nickwallen <nick@nickallen.org>
Authored: Thu Sep 15 15:07:36 2016 -0400
Committer: Nick Allen <nick@nickallen.org>
Committed: Thu Sep 15 15:07:36 2016 -0400

----------------------------------------------------------------------
 .../metron/profiler/client/ProfileWriter.java   |   4 +-
 .../org/apache/metron/hbase/bolt/HBaseBolt.java |  10 +-
 .../apache/metron/hbase/client/HBaseClient.java | 199 +++++++++++++++----
 .../java/org/apache/metron/hbase/Widget.java    |   9 +-
 .../apache/metron/hbase/bolt/HBaseBoltTest.java |   8 +-
 .../metron/hbase/client/HBaseClientTest.java    | 131 +++++++++---
 6 files changed, 276 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1fbbf764/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index db2f8da..eb88b5a 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -92,8 +92,8 @@ public class ProfileWriter {
     byte[] rowKey = rowKeyBuilder.rowKey(m, groups);
     ColumnList cols = columnBuilder.columns(m);
 
-    List<Mutation> mutations = hbaseClient.constructMutationReq(rowKey, cols, Durability.SKIP_WAL);
-    hbaseClient.batchMutate(mutations);
+    hbaseClient.addMutation(rowKey, cols, Durability.SKIP_WAL);
+    hbaseClient.mutate();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1fbbf764/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index 4c70ae4..a2da837 100644
--- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -82,7 +82,6 @@ public class HBaseBolt extends BaseRichBolt {
    */
   protected String tableProvider = "org.apache.metron.hbase.HTableProvider";
 
-  private List<Mutation> batchMutations;
   private BatchHelper batchHelper;
   protected OutputCollector collector;
   protected transient HBaseClient hbaseClient;
@@ -90,7 +89,6 @@ public class HBaseBolt extends BaseRichBolt {
   public HBaseBolt(String tableName, HBaseMapper mapper) {
     this.tableName = tableName;
     this.mapper = mapper;
-    this.batchMutations = new LinkedList<>();
   }
 
   public HBaseBolt writeToWAL(boolean writeToWAL) {
@@ -151,7 +149,7 @@ public class HBaseBolt extends BaseRichBolt {
 
     } catch (Exception e) {
       batchHelper.fail(e);
-      batchMutations.clear();
+      hbaseClient.clearMutations();
     }
   }
 
@@ -163,8 +161,7 @@ public class HBaseBolt extends BaseRichBolt {
     byte[] rowKey = this.mapper.rowKey(tuple);
     ColumnList cols = this.mapper.columns(tuple);
     Durability durability = writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL;
-    List<Mutation> mutations = hbaseClient.constructMutationReq(rowKey, cols, durability);
-    batchMutations.addAll(mutations);
+    hbaseClient.addMutation(rowKey, cols, durability);
     batchHelper.addBatch(tuple);
   }
 
@@ -172,9 +169,8 @@ public class HBaseBolt extends BaseRichBolt {
    * Flush all saved operations.
    */
   private void flush() {
-    this.hbaseClient.batchMutate(batchMutations);
+    this.hbaseClient.mutate();
     batchHelper.ack();
-    batchMutations.clear();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1fbbf764/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
index 888e485..e078d50 100644
--- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
@@ -20,22 +20,24 @@
 
 package org.apache.metron.hbase.client;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
 import org.apache.storm.hbase.common.ColumnList;
-import org.apache.storm.hbase.security.HBaseSecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /**
  * A client that interacts with HBase.
@@ -43,19 +45,42 @@ import java.util.Map;
 public class HBaseClient implements Closeable {
 
   private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class);
+
+  /**
+   * The batch of queued Mutations.
+   */
+  List<Mutation> mutations;
+
+  /**
+   * The batch of queued Gets.
+   */
+  List<Get> gets;
+
+  /**
+   * The HBase table this client interacts with.
+   */
   private HTableInterface table;
 
   public HBaseClient(TableProvider provider, final Configuration configuration, final String
tableName) {
+    this.mutations = new ArrayList<>();
+    this.gets = new ArrayList<>();
     try {
       this.table = provider.getTable(configuration, tableName);
-
-    } catch(Exception e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  public List<Mutation> constructMutationReq(byte[] rowKey, ColumnList cols, Durability
durability) {
-    List<Mutation> mutations = Lists.newArrayList();
+  /**
+   * Add a Mutation such as a Put or Increment to the batch.  The Mutation is only queued
for
+   * later execution.
+   *
+   * @param rowKey     The row key of the Mutation.
+   * @param cols       The columns affected by the Mutation.
+   * @param durability The durability of the mutation.
+   * @return
+   */
+  public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) {
 
     if (cols.hasColumns()) {
       Put put = createPut(rowKey, cols, durability);
@@ -70,14 +95,51 @@ public class HBaseClient implements Closeable {
     if (mutations.isEmpty()) {
       mutations.add(new Put(rowKey));
     }
+  }
+
+  /**
+   * Adds a Mutation such as a Put or Increment with a time to live.  The Mutation is only
queued
+   * for later execution.
+   *
+   * @param rowKey           The row key of the Mutation.
+   * @param cols             The columns affected by the Mutation.
+   * @param durability       The durability of the mutation.
+   * @param timeToLiveMillis The time to live in milliseconds.
+   */
+  public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis)
{
+
+    if (cols.hasColumns()) {
+      Put put = createPut(rowKey, cols, durability, timeToLiveMillis);
+      mutations.add(put);
+    }
+
+    if (cols.hasCounters()) {
+      Increment inc = createIncrement(rowKey, cols, durability, timeToLiveMillis);
+      mutations.add(inc);
+    }
+
+    if (mutations.isEmpty()) {
+      Put put = new Put(rowKey);
+      put.setTTL(timeToLiveMillis);
+      mutations.add(put);
+    }
+  }
 
-    return mutations;
+  /**
+   * Remove all queued Mutations from the batch.
+   */
+  public void clearMutations() {
+    mutations.clear();
   }
 
-  public void batchMutate(List<Mutation> mutations) {
+  /**
+   * Submits all queued Mutations.
+   */
+  public void mutate() {
     Object[] result = new Object[mutations.size()];
     try {
       table.batch(mutations, result);
+      mutations.clear();
 
     } catch (InterruptedException | IOException e) {
       LOG.warn("Error performing a mutation to HBase.", e);
@@ -85,25 +147,41 @@ public class HBaseClient implements Closeable {
     }
   }
 
-  public Get constructGetRequests(byte[] rowKey, HBaseProjectionCriteria projectionCriteria)
{
+  /**
+   * Adds a Get to the batch.
+   *
+   * @param rowKey   The row key of the Get
+   * @param criteria Defines the columns/families that will be retrieved.
+   */
+  public void addGet(byte[] rowKey, HBaseProjectionCriteria criteria) {
     Get get = new Get(rowKey);
 
-    if (projectionCriteria != null) {
-      for (byte[] columnFamily : projectionCriteria.getColumnFamilies()) {
-        get.addFamily(columnFamily);
-      }
-
-      for (HBaseProjectionCriteria.ColumnMetaData columnMetaData : projectionCriteria.getColumns())
{
-        get.addColumn(columnMetaData.getColumnFamily(), columnMetaData.getQualifier());
-      }
+    if (criteria != null) {
+      criteria.getColumnFamilies().forEach(cf -> get.addFamily(cf));
+      criteria.getColumns().forEach(col -> get.addColumn(col.getColumnFamily(), col.getQualifier()));
     }
 
-    return get;
+    // queue the get
+    this.gets.add(get);
   }
 
-  public Result[] batchGet(List<Get> gets) {
+  /**
+   * Clears all queued Gets from the batch.
+   */
+  public void clearGets() {
+    gets.clear();
+  }
+
+  /**
+   * Submit all queued Gets.
+   *
+   * @return The Result of each queued Get.
+   */
+  public Result[] getAll() {
     try {
-      return table.get(gets);
+      Result[] results = table.get(gets);
+      gets.clear();
+      return results;
 
     } catch (Exception e) {
       LOG.warn("Could not perform HBase lookup.", e);
@@ -111,6 +189,9 @@ public class HBaseClient implements Closeable {
     }
   }
 
+  /**
+   * Close the table.
+   */
   @Override
   public void close() throws IOException {
     table.close();
@@ -118,48 +199,78 @@ public class HBaseClient implements Closeable {
 
   /**
    * Creates an HBase Put.
-   * @param rowKey The row key.
-   * @param cols The columns to put.
+   *
+   * @param rowKey     The row key.
+   * @param cols       The columns to put.
    * @param durability The durability of the put.
    */
   private Put createPut(byte[] rowKey, ColumnList cols, Durability durability) {
     Put put = new Put(rowKey);
     put.setDurability(durability);
+    addColumns(cols, put);
+    return put;
+  }
+
+  /**
+   * Creates an HBase Put.
+   *
+   * @param rowKey           The row key.
+   * @param cols             The columns to put.
+   * @param durability       The durability of the put.
+   * @param timeToLiveMillis The TTL in milliseconds.
+   */
+  private Put createPut(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis)
{
+    Put put = new Put(rowKey);
+    put.setDurability(durability);
+    put.setTTL(timeToLiveMillis);
+    addColumns(cols, put);
+    return put;
+  }
 
+  /**
+   * Adds the columns to the Put
+   *
+   * @param cols The columns to add.
+   * @param put  The Put.
+   */
+  private void addColumns(ColumnList cols, Put put) {
     for (ColumnList.Column col : cols.getColumns()) {
+
       if (col.getTs() > 0) {
-        put.add(col.getFamily(),
-                col.getQualifier(),
-                col.getTs(),
-                col.getValue());
+        put.add(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue());
 
       } else {
-        put.add(col.getFamily(),
-                col.getQualifier(),
-                col.getValue());
+        put.add(col.getFamily(), col.getQualifier(), col.getValue());
       }
     }
-
-    return put;
   }
 
   /**
    * Creates an HBase Increment for a counter.
-   * @param rowKey The row key.
-   * @param cols The columns to include.
+   *
+   * @param rowKey     The row key.
+   * @param cols       The columns to include.
    * @param durability The durability of the increment.
    */
   private Increment createIncrement(byte[] rowKey, ColumnList cols, Durability durability)
{
     Increment inc = new Increment(rowKey);
     inc.setDurability(durability);
+    cols.getCounters().forEach(cnt -> inc.addColumn(cnt.getFamily(), cnt.getQualifier(),
cnt.getIncrement()));
+    return inc;
+  }
 
-    for (ColumnList.Counter cnt : cols.getCounters()) {
-      inc.addColumn(
-              cnt.getFamily(),
-              cnt.getQualifier(),
-              cnt.getIncrement());
-    }
-
+  /**
+   * Creates an HBase Increment for a counter.
+   *
+   * @param rowKey     The row key.
+   * @param cols       The columns to include.
+   * @param durability The durability of the increment.
+   */
+  private Increment createIncrement(byte[] rowKey, ColumnList cols, Durability durability,
long timeToLiveMillis) {
+    Increment inc = new Increment(rowKey);
+    inc.setDurability(durability);
+    inc.setTTL(timeToLiveMillis);
+    cols.getCounters().forEach(cnt -> inc.addColumn(cnt.getFamily(), cnt.getQualifier(),
cnt.getIncrement()));
     return inc;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1fbbf764/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/Widget.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/Widget.java
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/Widget.java
index d47e332..3733e5d 100644
--- a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/Widget.java
+++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/Widget.java
@@ -62,7 +62,6 @@ public class Widget {
     if (o == null || getClass() != o.getClass()) return false;
 
     Widget widget = (Widget) o;
-
     if (cost != widget.cost) return false;
     return name != null ? name.equals(widget.name) : widget.name == null;
 
@@ -74,4 +73,12 @@ public class Widget {
     result = 31 * result + cost;
     return result;
   }
+
+  @Override
+  public String toString() {
+    return "Widget{" +
+            "name='" + name + '\'' +
+            ", cost=" + cost +
+            '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1fbbf764/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
index 10b0243..8b39aaa 100644
--- a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
+++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
@@ -97,7 +97,7 @@ public class HBaseBoltTest extends BaseBoltTest {
     bolt.execute(tuple2);
 
     // batch size is 2, received 2 tuples - flush the batch
-    verify(client, times(1)).batchMutate(any(List.class));
+    verify(client, times(1)).mutate();
   }
 
   /**
@@ -109,7 +109,7 @@ public class HBaseBoltTest extends BaseBoltTest {
     bolt.execute(tuple1);
 
     // batch size is 2, but only 1 tuple received - do not flush batch
-    verify(client, times(0)).batchMutate(any(List.class));
+    verify(client, times(0)).mutate();
   }
 
   /**
@@ -121,11 +121,11 @@ public class HBaseBoltTest extends BaseBoltTest {
 
     // the batch is not ready to write
     bolt.execute(tuple1);
-    verify(client, times(0)).batchMutate(any(List.class));
+    verify(client, times(0)).mutate();
 
     // the batch should be flushed after the tick tuple
     bolt.execute(mockTickTuple());
-    verify(client, times(1)).batchMutate(any(List.class));
+    verify(client, times(1)).mutate();
   }
 
   private static Tuple mockTuple(String componentId, String streamId) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1fbbf764/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
index 925c63d..e2afd1c 100644
--- a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
+++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
@@ -25,9 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.hbase.Widget;
@@ -44,12 +42,16 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
-import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.apache.metron.hbase.WidgetMapper.CF;
+import static org.apache.metron.hbase.WidgetMapper.QCOST;
+import static org.apache.metron.hbase.WidgetMapper.QNAME;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.junit.Assert.assertEquals;
 
 /**
  * Tests the HBaseClient
@@ -63,6 +65,10 @@ public class HBaseClientTest {
   private HTableInterface table;
   private Tuple tuple1;
   private Tuple tuple2;
+  byte[] rowKey1;
+  byte[] rowKey2;
+  ColumnList cols1;
+  ColumnList cols2;
   private Widget widget1;
   private Widget widget2;
   private HBaseMapper mapper;
@@ -90,10 +96,16 @@ public class HBaseClientTest {
     tuple1 = mock(Tuple.class);
     when(tuple1.getValueByField(eq("widget"))).thenReturn(widget1);
 
+    rowKey1 = mapper.rowKey(tuple1);
+    cols1 = mapper.columns(tuple1);
+
     // setup the second tuple
     widget2 = new Widget("widget2", 200);
     tuple2 = mock(Tuple.class);
     when(tuple2.getValueByField(eq("widget"))).thenReturn(widget2);
+
+    rowKey2 = mapper.rowKey(tuple2);
+    cols2 = mapper.columns(tuple2);
   }
 
   @Before
@@ -104,6 +116,7 @@ public class HBaseClientTest {
 
     // create the table
     table = util.createTable(Bytes.toBytes(tableName), WidgetMapper.CF);
+    util.waitTableEnabled(table.getName());
 
     // setup the client
     client = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
@@ -114,21 +127,22 @@ public class HBaseClientTest {
     util.deleteTable(tableName);
   }
 
+  /**
+   * Should be able to read/write a single Widget.
+   */
   @Test
   public void testWrite() throws Exception {
 
     // add a tuple to the batch
-    byte[] rowKey1 = mapper.rowKey(tuple1);
-    ColumnList cols1 = mapper.columns(tuple1);
-    List<Mutation> mutations1 = client.constructMutationReq(rowKey1, cols1, Durability.SYNC_WAL);
-    client.batchMutate(mutations1);
+    client.addMutation(rowKey1, cols1, Durability.SYNC_WAL);
+    client.mutate();
 
     HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
     criteria.addColumnFamily(WidgetMapper.CF_STRING);
 
     // read back the tuple
-    Get get1 = client.constructGetRequests(rowKey1, criteria);
-    Result[] results = client.batchGet(Arrays.asList(get1));
+    client.addGet(rowKey1, criteria);
+    Result[] results = client.getAll();
     Assert.assertEquals(1, results.length);
 
     // validate
@@ -136,46 +150,109 @@ public class HBaseClientTest {
     assertEquals(widget1, toWidget(results[0]));
   }
 
+  /**
+   * Should be able to read/write multiple Widgets in a batch.
+   */
   @Test
   public void testBatchWrite() throws Exception {
 
-    // add a tuple to the batch
-    byte[] rowKey1 = mapper.rowKey(tuple1);
-    ColumnList cols1 = mapper.columns(tuple1);
-    List<Mutation> mutations1 = client.constructMutationReq(rowKey1, cols1, Durability.SYNC_WAL);
-    client.batchMutate(mutations1);
+    // add two mutations to the queue
+    client.addMutation(rowKey1, cols1, Durability.SYNC_WAL);
+    client.addMutation(rowKey2, cols2, Durability.SYNC_WAL);
+    client.mutate();
+
+    HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
+    criteria.addColumnFamily(WidgetMapper.CF_STRING);
+
+    // read back both tuples
+    client.addGet(rowKey1, criteria);
+    client.addGet(rowKey2, criteria);
+    Result[] results = client.getAll();
+
+    // validate
+    assertEquals(2, results.length);
+    List<Widget> expected = Arrays.asList(widget1, widget2);
+    for(Result result : results) {
+      Widget widget = toWidget(result);
+      Assert.assertTrue(expected.contains(widget));
+    }
+  }
+
+  /**
+   * Should be able to read back widgets that were written with a TTL 30 days out.
+   */
+  @Test
+  public void testWriteWithTimeToLive() throws Exception {
+    long timeToLive = TimeUnit.DAYS.toMillis(30);
 
-    // add another tuple to the batch
-    byte[] rowKey2 = mapper.rowKey(tuple1);
-    ColumnList cols2 = mapper.columns(tuple1);
-    List<Mutation> mutations2 = client.constructMutationReq(rowKey2, cols2, Durability.SYNC_WAL);
-    client.batchMutate(mutations2);
+    // add two mutations to the queue
+    client.addMutation(rowKey1, cols1, Durability.SYNC_WAL, timeToLive);
+    client.addMutation(rowKey2, cols2, Durability.SYNC_WAL, timeToLive);
+    client.mutate();
 
     HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
     criteria.addColumnFamily(WidgetMapper.CF_STRING);
 
     // read back both tuples
-    Get get1 = client.constructGetRequests(rowKey1, criteria);
-    Get get2 = client.constructGetRequests(rowKey2, criteria);
-    Result[] results = client.batchGet(Arrays.asList(get1, get2));
+    client.addGet(rowKey1, criteria);
+    client.addGet(rowKey2, criteria);
+    Result[] results = client.getAll();
 
     // validate
     assertEquals(2, results.length);
     List<Widget> expected = Arrays.asList(widget1, widget2);
     for(Result result : results) {
       Widget widget = toWidget(result);
-      Assert.assertThat(expected, hasItem(widget));
+      Assert.assertTrue(expected.contains(widget));
     }
   }
 
   /**
+   * Should NOT be able to read widgets that are expired due to the TTL.
+   */
+  @Test
+  public void testExpiredWidgets() throws Exception {
+    long timeToLive = TimeUnit.MILLISECONDS.toMillis(1);
+
+    // add two mutations to the queue
+    client.addMutation(rowKey1, cols1, Durability.SYNC_WAL, timeToLive);
+    client.addMutation(rowKey2, cols2, Durability.SYNC_WAL, timeToLive);
+    client.mutate();
+
+    HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
+    criteria.addColumnFamily(WidgetMapper.CF_STRING);
+
+    // wait for a second to ensure the TTL has expired
+    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
+
+    // read back both tuples
+    client.addGet(rowKey1, criteria);
+    client.addGet(rowKey2, criteria);
+    Result[] results = client.getAll();
+
+    // validate - the TTL should have expired all widgets
+    List<Widget> widgets = Arrays
+            .stream(results)
+            .map(r -> toWidget(r))
+            .filter(w -> w != null)
+            .collect(Collectors.toList());
+    assertEquals(0, widgets.size());
+  }
+
+  /**
    * Transforms the HBase Result to a Widget.
    * @param result The HBase Result.
    * @return The Widget.
    */
   private Widget toWidget(Result result) {
-    int cost = Bytes.toInt(result.getValue(WidgetMapper.CF, WidgetMapper.QCOST));
-    String name = Bytes.toString(result.getValue(WidgetMapper.CF, WidgetMapper.QNAME));
-    return new Widget(name, cost);
+    Widget widget = null;
+
+    if(result.containsColumn(CF, QCOST) && result.containsColumn(CF, QNAME)) {
+      String name = Bytes.toString(result.getValue(CF, QNAME));
+      int cost = Bytes.toInt(result.getValue(CF, QCOST));
+      widget = new Widget(name, cost);
+    }
+
+    return widget;
   }
 }


Mime
View raw message