hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject hbase git commit: HBASE-19427 Add TimeRange support into Append to optimize for counters
Date Wed, 13 Dec 2017 07:44:27 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 1b822b60b -> 677c1f2c6


HBASE-19427 Add TimeRange support into Append to optimize for counters


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/677c1f2c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/677c1f2c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/677c1f2c

Branch: refs/heads/master
Commit: 677c1f2c635273eb823b91903dffdb2e587f5181
Parents: 1b822b6
Author: Chia-Ping Tsai <chia7712@gmail.com>
Authored: Wed Dec 13 15:35:49 2017 +0800
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Wed Dec 13 15:35:49 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Append.java  |  39 +++++
 .../apache/hadoop/hbase/client/Increment.java   |   2 +
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  16 +-
 .../hbase/shaded/protobuf/ProtobufUtil.java     |  14 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  15 +-
 .../hbase/security/access/AccessController.java |   2 +-
 .../hbase/coprocessor/TestAppendTimeRange.java  | 159 +++++++++++++++++++
 7 files changed, 240 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/677c1f2c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 89ea082..da07ea6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -24,9 +24,11 @@ import java.util.UUID;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -42,6 +44,42 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Public
 public class Append extends Mutation {
+  private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
+  private TimeRange tr = new TimeRange();
+
+  /**
+   * Sets the TimeRange to be used on the Get for this append.
+   * <p>
+   * This is useful for when you have counters that only last for specific
+   * periods of time (ie. counters that are partitioned by time).  By setting
+   * the range of valid times for this append, you can potentially gain
+   * some performance with a more optimal Get operation.
+   * Be careful adding the time range to this class as you will update the old cell if the
+   * time range doesn't include the latest cells.
+   * <p>
+   * This range is used as [minStamp, maxStamp).
+   * @param minStamp minimum timestamp value, inclusive
+   * @param maxStamp maximum timestamp value, exclusive
+   * @return this
+   */
+  public Append setTimeRange(long minStamp, long maxStamp) {
+    tr = new TimeRange(minStamp, maxStamp);
+    return this;
+  }
+
+  /**
+   * Gets the TimeRange used for this append.
+   * @return TimeRange
+   */
+  public TimeRange getTimeRange() {
+    return this.tr;
+  }
+
+  @Override
+  protected long extraHeapSize(){
+    return HEAP_OVERHEAD;
+  }
+
   /**
    * @param returnResults
    *          True (default) if the append operation should return the results.
@@ -77,6 +115,7 @@ public class Append extends Mutation {
   public Append(Append a) {
     this.row = a.getRow();
     this.ts = a.getTimeStamp();
+    this.tr = a.getTimeRange();
     this.familyMap.putAll(a.getFamilyCellMap());
     for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
       this.setAttribute(entry.getKey(), entry.getValue());

http://git-wip-us.apache.org/repos/asf/hbase/blob/677c1f2c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index 2f8dd26..52c0c59 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -138,6 +138,8 @@ public class Increment extends Mutation implements Comparable<Row>
{
    * periods of time (ie. counters that are partitioned by time).  By setting
    * the range of valid times for this increment, you can potentially gain
    * some performance with a more optimal Get operation.
+   * Be careful adding the time range to this class as you will update the old cell if the
+   * time range doesn't include the latest cells.
    * <p>
    * This range is used as [minStamp, maxStamp).
    * @param minStamp minimum timestamp value, inclusive

http://git-wip-us.apache.org/repos/asf/hbase/blob/677c1f2c/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index bc49cb0..67fea5f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -714,8 +714,13 @@ public final class ProtobufUtil {
           throws IOException {
     MutationType type = proto.getMutateType();
     assert type == MutationType.APPEND : type.name();
-    return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
-            Append::add, proto, cellScanner);
+    Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
+        Append::add, proto, cellScanner);
+    if (proto.hasTimeRange()) {
+      TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+      append.setTimeRange(timeRange.getMin(), timeRange.getMax());
+    }
+    return append;
   }
 
   /**
@@ -1175,6 +1180,10 @@ public final class ProtobufUtil {
       TimeRange timeRange = ((Increment) mutation).getTimeRange();
       setTimeRange(builder, timeRange);
     }
+    if (type == MutationType.APPEND) {
+      TimeRange timeRange = ((Append) mutation).getTimeRange();
+      setTimeRange(builder, timeRange);
+    }
     ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
     QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
     for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet())
{
@@ -1233,6 +1242,9 @@ public final class ProtobufUtil {
     if (mutation instanceof Increment) {
       setTimeRange(builder, ((Increment)mutation).getTimeRange());
     }
+    if (mutation instanceof Append) {
+      setTimeRange(builder, ((Append)mutation).getTimeRange());
+    }
     if (nonce != HConstants.NO_NONCE) {
       builder.setNonce(nonce);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/677c1f2c/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 5971b3c..f856c7e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -856,8 +856,13 @@ public final class ProtobufUtil {
           throws IOException {
     MutationType type = proto.getMutateType();
     assert type == MutationType.APPEND : type.name();
-    return toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
+    Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
             Append::add, proto, cellScanner);
+    if (proto.hasTimeRange()) {
+      TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+      append.setTimeRange(timeRange.getMin(), timeRange.getMax());
+    }
+    return append;
   }
 
   /**
@@ -1340,6 +1345,10 @@ public final class ProtobufUtil {
       TimeRange timeRange = ((Increment) mutation).getTimeRange();
       setTimeRange(builder, timeRange);
     }
+    if (type == MutationType.APPEND) {
+      TimeRange timeRange = ((Append) mutation).getTimeRange();
+      setTimeRange(builder, timeRange);
+    }
     ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
     QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
     for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet())
{
@@ -1398,6 +1407,9 @@ public final class ProtobufUtil {
     if (mutation instanceof Increment) {
       setTimeRange(builder, ((Increment)mutation).getTimeRange());
     }
+    if (mutation instanceof Append) {
+      setTimeRange(builder, ((Append)mutation).getTimeRange());
+    }
     if (nonce != HConstants.NO_NONCE) {
       builder.setNonce(nonce);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/677c1f2c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 85c12e9..8ca1184 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7672,9 +7672,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
     List<Cell> toApply = new ArrayList<>(deltas.size());
     // Get previous values for all columns in this family.
-    List<Cell> currentValues = get(mutation, store, deltas,
-        null/*Default IsolationLevel*/,
-        op == Operation.INCREMENT? ((Increment)mutation).getTimeRange(): null);
+    TimeRange tr = null;
+    switch (op) {
+      case INCREMENT:
+        tr = ((Increment)mutation).getTimeRange();
+        break;
+      case APPEND:
+        tr = ((Append)mutation).getTimeRange();
+        break;
+      default:
+        break;
+    }
+    List<Cell> currentValues = get(mutation, store, deltas,null, tr);
     // Iterate the input columns and update existing values if they were found, otherwise
     // add new column initialized to the delta amount
     int currentValuesIndex = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/677c1f2c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 2e2d263..0f9d8a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -1952,7 +1952,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
       AuthResult authResult = null;
       User user = getActiveUser(c);
       if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(),
-          append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
+          append.getFamilyCellMap(), append.getTimeRange().getMax(), Action.WRITE)) {
         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
             user, Action.WRITE, table, append.getFamilyCellMap());
       } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/677c1f2c/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java
new file mode 100644
index 0000000..8842075
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java
@@ -0,0 +1,159 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestAppendTimeRange {
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final HBaseTestingUtility util = new HBaseTestingUtility();
+  private static final ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
+
+  private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
+
+  private static final byte[] ROW = Bytes.toBytes("aaa");
+
+  private static final byte[] QUAL = Bytes.toBytes("col1");
+
+  private static final byte[] VALUE = Bytes.toBytes("1");
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        MyObserver.class.getName());
+    // Make general delay zero rather than default. Timing is off in this
+    // test that depends on an evironment edge that is manually moved forward.
+    util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0);
+    util.startMiniCluster();
+    EnvironmentEdgeManager.injectEdge(mee);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  public static class MyObserver implements RegionCoprocessor, RegionObserver {
+    private static TimeRange tr10 = null;
+    private static TimeRange tr2 = null;
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public Result preAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Append append) throws IOException {
+      NavigableMap<byte [], List<Cell>> map = append.getFamilyCellMap();
+      for (Map.Entry<byte [], List<Cell>> entry : map.entrySet()) {
+        for (Cell cell : entry.getValue()) {
+          String appendStr = Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
+              cell.getValueLength());
+          if (appendStr.equals("b")) {
+            tr10 = append.getTimeRange();
+          } else if (appendStr.equals("c") && !append.getTimeRange().isAllTime())
{
+            tr2 = append.getTimeRange();
+          }
+        }
+      }
+      return null;
+    }
+  }
+
+  @Test
+  public void testHTableInterfaceMethods() throws Exception {
+    try (Table table = util.createTable(TableName.valueOf(name.getMethodName()), TEST_FAMILY))
{
+      table.put(new Put(ROW).addColumn(TEST_FAMILY, QUAL, VALUE));
+      long time = EnvironmentEdgeManager.currentTime();
+      mee.setValue(time);
+      table.put(new Put(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("a")));
+      checkRowValue(table, ROW, Bytes.toBytes("a"));
+
+      time = EnvironmentEdgeManager.currentTime();
+      mee.setValue(time);
+      TimeRange range10 = new TimeRange(1, time + 10);
+      Result r = table.append(new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("b"))
+          .setTimeRange(range10.getMin(), range10.getMax()));
+      checkRowValue(table, ROW, Bytes.toBytes("ab"));
+      assertEquals(MyObserver.tr10.getMin(), range10.getMin());
+      assertEquals(MyObserver.tr10.getMax(), range10.getMax());
+      time = EnvironmentEdgeManager.currentTime();
+      mee.setValue(time);
+      TimeRange range2 = new TimeRange(1, time+20);
+      List<Row> actions =
+          Arrays.asList(new Row[] {
+              new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("c"))
+                  .setTimeRange(range2.getMin(), range2.getMax()),
+              new Append(ROW).addColumn(TEST_FAMILY, QUAL, Bytes.toBytes("c"))
+                  .setTimeRange(range2.getMin(), range2.getMax()) });
+      Object[] results1 = new Object[actions.size()];
+      table.batch(actions, results1);
+      assertEquals(MyObserver.tr2.getMin(), range2.getMin());
+      assertEquals(MyObserver.tr2.getMax(), range2.getMax());
+      for (Object r2 : results1) {
+        assertTrue(r2 instanceof Result);
+      }
+      checkRowValue(table, ROW, Bytes.toBytes("abcc"));
+    }
+  }
+
+  private void checkRowValue(Table table, byte[] row, byte[] expectedValue) throws IOException
{
+    Get get = new Get(row).addColumn(TEST_FAMILY, QUAL);
+    Result result = table.get(get);
+    byte[] actualValue = result.getValue(TEST_FAMILY, QUAL);
+    assertArrayEquals(expectedValue, actualValue);
+  }
+}


Mime
View raw message