hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject hbase git commit: HBASE-16055 PutSortReducer loses any Visibility/acl attribute set on the Puts (Ram)
Date Mon, 11 Jul 2016 06:48:16 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 60ff6128e -> ecc1a886e


HBASE-16055 PutSortReducer loses any Visibility/acl attribute set on the
Puts (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ecc1a886
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ecc1a886
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ecc1a886

Branch: refs/heads/0.98
Commit: ecc1a886e4dd8747595a82d17eb805eabf26c20b
Parents: 60ff612
Author: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Authored: Mon Jul 11 12:17:31 2016 +0530
Committer: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Committed: Mon Jul 11 12:17:31 2016 +0530

----------------------------------------------------------------------
 .../DefaultVisibilityExpressionResolver.java    |   7 +-
 .../hadoop/hbase/mapreduce/PutSortReducer.java  |  61 +++++++-
 .../hadoop/hbase/mapreduce/TextSortReducer.java |   8 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java | 147 ++++++++++++++-----
 4 files changed, 179 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc1a886/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
index 1954f68..39d6898 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
@@ -70,10 +70,6 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression
     HTable labelsTable = null;
     try {
       labelsTable = new HTable(conf, LABELS_TABLE_NAME);
-    } catch (TableNotFoundException e) {
-      // Just return with out doing any thing. When the VC is not used we wont be having
'labels'
-      // table in the cluster.
-      return;
     } catch (IOException e) {
       LOG.error("Error opening 'labels' table", e);
       return;
@@ -90,6 +86,9 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression
         byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
         labels.put(Bytes.toString(value), Bytes.toInt(row));
       }
+    } catch (TableNotFoundException e) {
+      // Table not found. So just return
+      return;
     } catch (IOException e) {
       LOG.error("Error reading 'labels' table", e);
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc1a886/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
index 792686a..4160cdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
@@ -18,17 +18,25 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.StringUtils;
 
@@ -44,7 +52,17 @@ import org.apache.hadoop.util.StringUtils;
 @InterfaceStability.Stable
 public class PutSortReducer extends
     Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
-  
+  // the cell creator
+  private CellCreator kvCreator;
+
+  @Override
+  protected void
+      setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context
context)
+          throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    this.kvCreator = new CellCreator(conf);
+  }
+
   @Override
   protected void reduce(
       ImmutableBytesWritable row,
@@ -60,12 +78,51 @@ public class PutSortReducer extends
     while (iter.hasNext()) {
       TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
       long curSize = 0;
+      List<Tag> tags = new ArrayList<Tag>();
       // stop at the end or the RAM threshold
       while (iter.hasNext() && curSize < threshold) {
+        tags.clear();
         Put p = iter.next();
+        long t = p.getTTL();
+        if (t != Long.MAX_VALUE) {
+          // add TTL tag if found
+          tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
+        }
+        byte[] acl = p.getACL();
+        if (acl != null) {
+          // add ACL tag if found
+          tags.add(new Tag(TagType.ACL_TAG_TYPE, acl));
+        }
+        try {
+          CellVisibility cellVisibility = p.getCellVisibility();
+          if (cellVisibility != null) {
+            // add the visibility labels if any
+            tags.addAll(kvCreator.getVisibilityExpressionResolver()
+                .createVisibilityExpTags(cellVisibility.getExpression()));
+          }
+        } catch (DeserializationException e) {
+          // We just throw exception here. Should we allow other mutations to proceed by
+          // just ignoring the bad one?
+          throw new IOException("Invalid visibility expression found in mutation " + p, e);
+        }
         for (List<Cell> cells: p.getFamilyCellMap().values()) {
           for (Cell cell: cells) {
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            // Creating the KV which needs to be directly written to HFiles. Using the Facade
+            // KVCreator for creation of kvs.
+            KeyValue kv = null;
+            if (cell.getTagsLengthUnsigned() != 0) {
+              tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+                cell.getTagsLengthUnsigned()));
+            }
+            if (!tags.isEmpty()) {
+              kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
+                cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
+                cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
+                cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
+                cell.getValueOffset(), cell.getValueLength(), tags);
+            } else {
+              kv = KeyValueUtil.ensureKeyValue(cell);
+            }
             if (map.add(kv)) {// don't count duplicated kv into size
               curSize += kv.heapSize();
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc1a886/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 168ba40..cb329f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -99,9 +99,8 @@ public class TextSortReducer extends
    */
   @Override
   protected void setup(Context context) {
-    doSetup(context);
-
     Configuration conf = context.getConfiguration();
+    doSetup(context, conf);
 
     parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
     if (parser.getRowKeyColumnIndex() == -1) {
@@ -113,10 +112,9 @@ public class TextSortReducer extends
   /**
    * Handles common parameter initialization that a subclass might want to leverage.
    * @param context
+   * @param conf
    */
-  protected void doSetup(Context context) {
-    Configuration conf = context.getConfiguration();
-
+  protected void doSetup(Context context, Configuration conf) {
     // If a custom separator has been used,
     // decode it back from Base64 encoding.
     separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc1a886/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index a7f178b..76fb516 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -33,7 +33,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
 import java.util.Stack;
@@ -74,6 +73,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -170,11 +170,78 @@ public class TestHFileOutputFormat2  {
     }
   }
 
-  private void setupRandomGeneratorMapper(Job job) {
-    job.setInputFormatClass(NMapInputFormat.class);
-    job.setMapperClass(RandomKVGeneratingMapper.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-    job.setMapOutputValueClass(KeyValue.class);
+  /**
+   * Simple mapper that makes Put output.
+   */
+  static class RandomPutGeneratingMapper
+      extends Mapper<NullWritable, NullWritable,
+                 ImmutableBytesWritable, Put> {
+
+    private int keyLength;
+    private static final int KEYLEN_DEFAULT=10;
+    private static final String KEYLEN_CONF="randomkv.key.length";
+
+    private int valLength;
+    private static final int VALLEN_DEFAULT=10;
+    private static final String VALLEN_CONF="randomkv.val.length";
+    private static final byte [] QUALIFIER = Bytes.toBytes("data");
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+
+      Configuration conf = context.getConfiguration();
+      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+    }
+
+    @Override
+    protected void map(
+        NullWritable n1, NullWritable n2,
+        Mapper<NullWritable, NullWritable,
+               ImmutableBytesWritable,Put>.Context context)
+        throws java.io.IOException ,InterruptedException
+    {
+
+      byte keyBytes[] = new byte[keyLength];
+      byte valBytes[] = new byte[valLength];
+
+      int taskId = context.getTaskAttemptID().getTaskID().getId();
+      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+      Random random = new Random();
+      for (int i = 0; i < ROWSPERSPLIT; i++) {
+
+        random.nextBytes(keyBytes);
+        // Ensure that unique tasks generate unique keys
+        keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
+        random.nextBytes(valBytes);
+        ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+        for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+          Put p = new Put(keyBytes);
+          p.add(family, QUALIFIER, valBytes);
+          // set TTL to very low so that the scan does not return any value
+          p.setTTL(1l);
+          context.write(key, p);
+        }
+      }
+    }
+  }
+
+  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
+    if (putSortReducer) {
+      job.setInputFormatClass(NMapInputFormat.class);
+      job.setMapperClass(RandomPutGeneratingMapper.class);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(Put.class);
+    } else {
+      job.setInputFormatClass(NMapInputFormat.class);
+      job.setMapperClass(RandomKVGeneratingMapper.class);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(KeyValue.class);
+    }
   }
 
   /**
@@ -312,7 +379,7 @@ public class TestHFileOutputFormat2  {
     conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
 
     Job job = new Job(conf, "testWritingPEData");
-    setupRandomGeneratorMapper(job);
+    setupRandomGeneratorMapper(job, false);
     // This partitioner doesn't work well for number keys but using it anyways
     // just to demonstrate how to configure it.
     byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
@@ -434,13 +501,13 @@ public class TestHFileOutputFormat2  {
   @Test
   public void testMRIncrementalLoad() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoad\n");
-    doIncrementalLoadTest(false, false);
+    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
   }
 
   @Test
   public void testMRIncrementalLoadWithSplit() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
-    doIncrementalLoadTest(true, false);
+    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
   }
 
   /**
@@ -454,15 +521,22 @@ public class TestHFileOutputFormat2  {
   @Test
   public void testMRIncrementalLoadWithLocality() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
-    doIncrementalLoadTest(false, true);
-    doIncrementalLoadTest(true, true);
+    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
+    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
   }
 
-  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality)
-      throws Exception {
+  @Test
+  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
+    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
+    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
+  }
+
+  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
+      boolean putSortReducer, String tableStr) throws Exception {
     util = new HBaseTestingUtility();
     Configuration conf = util.getConfiguration();
     conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
+    conf.setInt("hfile.format.version", 3);
     int hostCount = 1;
     int regionNum = 5;
     if (shouldKeepLocality) {
@@ -491,7 +565,7 @@ public class TestHFileOutputFormat2  {
 
       // Generate the bulk load files
       util.startMiniMapReduceCluster();
-      runIncrementalPELoad(conf, table, testDir);
+      runIncrementalPELoad(conf, table, testDir, putSortReducer);
       // This doesn't write into the table, just makes files
       assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
 
@@ -528,21 +602,28 @@ public class TestHFileOutputFormat2  {
       // Perform the actual load
       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
 
-      // Ensure data shows up
-      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
-      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
-        util.countRows(table));
-      Scan scan = new Scan();
-      ResultScanner results = table.getScanner(scan);
-      for (Result res : results) {
-        assertEquals(FAMILIES.length, res.rawCells().length);
-        Cell first = res.rawCells()[0];
-        for (Cell kv : res.rawCells()) {
-          assertTrue(CellUtil.matchingRow(first, kv));
-          assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+      int expectedRows = 0;
+      if (putSortReducer) {
+        // no rows should be extracted
+        assertEquals("LoadIncrementalHFiles should not get the expected data in table",
+          expectedRows, util.countRows(table));
+      } else {
+        // Ensure data shows up
+        expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+        assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+          util.countRows(table));
+        Scan scan = new Scan();
+        ResultScanner results = table.getScanner(scan);
+        for (Result res : results) {
+          assertEquals(FAMILIES.length, res.rawCells().length);
+          Cell first = res.rawCells()[0];
+          for (Cell kv : res.rawCells()) {
+            assertTrue(CellUtil.matchingRow(first, kv));
+            assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+          }
         }
+        results.close();
       }
-      results.close();
       String tableDigestBefore = util.checksumRows(table);
 
       // Check region locality
@@ -576,14 +657,14 @@ public class TestHFileOutputFormat2  {
   }
 
   private void runIncrementalPELoad(
-      Configuration conf, HTable table, Path outDir)
+      Configuration conf, HTable table, Path outDir, boolean putSortReducer)
   throws Exception {
     Job job = new Job(conf, "testLocalMRIncrementalLoad");
     job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
         KeyValueSerialization.class.getName());
-    setupRandomGeneratorMapper(job);
+    setupRandomGeneratorMapper(job, putSortReducer);
     HFileOutputFormat2.configureIncrementalLoad(job, table);
     FileOutputFormat.setOutputPath(job, outDir);
 
@@ -935,7 +1016,7 @@ public class TestHFileOutputFormat2  {
 
       Job job = new Job(conf, "testLocalMRIncrementalLoad");
       job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
-      setupRandomGeneratorMapper(job);
+      setupRandomGeneratorMapper(job, false);
       HFileOutputFormat2.configureIncrementalLoad(job, table);
       FileOutputFormat.setOutputPath(job, dir);
       context = createTestTaskAttemptContext(job);
@@ -1039,7 +1120,7 @@ public class TestHFileOutputFormat2  {
 
       for (int i = 0; i < 2; i++) {
         Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" +
i);
-        runIncrementalPELoad(conf, table, testDir);
+        runIncrementalPELoad(conf, table, testDir, false);
         // Perform the actual load
         new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
       }
@@ -1116,7 +1197,7 @@ public class TestHFileOutputFormat2  {
       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
           true);
       util.startMiniMapReduceCluster();
-      runIncrementalPELoad(conf, table, testDir);
+      runIncrementalPELoad(conf, table, testDir, false);
 
       // Perform the actual load
       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
@@ -1187,7 +1268,7 @@ public class TestHFileOutputFormat2  {
       byte[] tname = args[1].getBytes();
       HTable table = new HTable(conf, tname);
       Path outDir = new Path("incremental-out");
-      runIncrementalPELoad(conf, table, outDir);
+      runIncrementalPELoad(conf, table, outDir, false);
     } else {
       throw new RuntimeException(
           "usage: TestHFileOutputFormat2 newtable | incremental");


Mime
View raw message