accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hkeeb...@apache.org
Subject [accumulo] branch master updated: Fix #1043 Support stable ~del split points (#1344)
Date Tue, 17 Sep 2019 14:53:53 GMT
This is an automated email from the ASF dual-hosted git repository.

hkeebler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c5ddc8  Fix #1043 Support stable ~del split points (#1344)
0c5ddc8 is described below

commit 0c5ddc8a4ef449ef546a77986d81127fccef67d2
Author: hkeebler <49656678+hkeebler@users.noreply.github.com>
AuthorDate: Tue Sep 17 10:53:48 2019 -0400

    Fix #1043 Support stable ~del split points (#1344)
    
    * Fix #1043 Support stable ~del split points
     includes recommended changes
---
 .../core/metadata/schema/MetadataSchema.java       | 11 ++++-
 .../accumulo/core/metadata/schema/SortSkew.java    | 48 ++++++++++++++++++++
 .../core/metadata/schema/DeleteMetadataTest.java   | 35 +++++++++++++++
 .../core/metadata/schema/SortSkewTest.java         | 52 ++++++++++++++++++++++
 .../accumulo/server/metadata/ServerAmpleImpl.java  | 11 ++---
 .../accumulo/server/util/ListVolumesUsed.java      | 33 ++++++--------
 .../test/functional/GarbageCollectorIT.java        | 13 +++---
 7 files changed, 168 insertions(+), 35 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 9d23ae0..b2fdede 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -255,12 +255,19 @@ public class MetadataSchema {
     private static final Section section =
         new Section(RESERVED_PREFIX + "del", true, RESERVED_PREFIX + "dem", false);
 
+    private static final int encoded_prefix_length =
+        section.getRowPrefix().length() + SortSkew.SORTSKEW_LENGTH;
+
     public static Range getRange() {
       return section.getRange();
     }
 
-    public static String getRowPrefix() {
-      return section.getRowPrefix();
+    public static String encodeRow(String value) {
+      return section.getRowPrefix() + SortSkew.getCode(value) + value;
+    }
+
+    public static String decodeRow(String row) {
+      return row.substring(encoded_prefix_length);
     }
 
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/SortSkew.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SortSkew.java
new file mode 100644
index 0000000..665139c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SortSkew.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.metadata.schema;
+
+import static com.google.common.hash.Hashing.murmur3_32;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Strings;
+
+/*
+ * A subprefix used to remove sort skew from some of the metadata generated entries, for
example: file deletes
+ * prefixed with ~del.  NOTE:  This is persisted data so any change to this processing should
+ * consider any existing data.
+ */
+public class SortSkew {
+
+  // A specified length for the skew code used is necessary to parse the key correctly.
+  // The Hex code for an integer will always be <= 8
+  public static final int SORTSKEW_LENGTH = Integer.BYTES * 2;
+
+  /**
+   * Creates a left justified hex string for the path hashcode of a deterministic length,
therefore
+   * if necessary it is right padded with zeros
+   *
+   * @param keypart
+   *          value to be coded
+   * @return coded value of keypart
+   */
+  public static String getCode(String keypart) {
+    int hashCode = murmur3_32().hashString(keypart, UTF_8).asInt();
+    return Strings.padStart(Integer.toHexString(hashCode), SORTSKEW_LENGTH, '0');
+  }
+
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/DeleteMetadataTest.java
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/DeleteMetadataTest.java
new file mode 100644
index 0000000..4890ab4
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/DeleteMetadataTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.metadata.schema;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class DeleteMetadataTest {
+
+  @Test
+  public void encodeRowTest() {
+    String path = "/dir/testpath";
+    assertEquals(path,
+        MetadataSchema.DeletesSection.decodeRow(MetadataSchema.DeletesSection.encodeRow(path)));
+    path = "hdfs://localhost:8020/dir/r+/1_table/f$%#";
+    assertEquals(path,
+        MetadataSchema.DeletesSection.decodeRow(MetadataSchema.DeletesSection.encodeRow(path)));
+
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/SortSkewTest.java
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SortSkewTest.java
new file mode 100644
index 0000000..d71163d
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SortSkewTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.metadata.schema;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.junit.Test;
+
+public class SortSkewTest {
+  private static final String shortpath = "1";
+  private static final String longpath =
+      "/verylongpath/12345679xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxiiiiiiiiiiiiiiiiii/zzzzzzzzzzzzzzzzzzzzz"
+          + "aaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbccccccccccccccccccccccccccxxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyzzzzzzzzzzzzzzzz";;
+  // these are values previously generated from SortSkew.getCode() for the above
+  private static final String shortcode = "9416ac93";
+  private static final String longcode = "b9ddf266";
+
+  @Test
+  public void verifyCodeSize() {
+    int expectedLength = SortSkew.SORTSKEW_LENGTH;
+    assertEquals(expectedLength, SortSkew.getCode(shortpath).length());
+    assertEquals(expectedLength, SortSkew.getCode(longpath).length());
+  }
+
+  @Test
+  public void verifySame() {
+    assertEquals(SortSkew.getCode("123"), SortSkew.getCode("123"));
+    assertNotEquals(SortSkew.getCode("123"), SortSkew.getCode("321"));
+  }
+
+  @Test
+  public void verifyStable() {
+    assertEquals(shortcode, SortSkew.getCode(shortpath));
+    assertEquals(longcode, SortSkew.getCode(longpath));
+  }
+
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index ce5f4f3..df59341 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -134,7 +134,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
 
     try (BatchWriter writer = context.createBatchWriter(level.metaTable())) {
       for (String path : paths) {
-        Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + path);
+        Mutation m = new Mutation(MetadataSchema.DeletesSection.encodeRow(path));
         m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
         writer.addMutation(m);
       }
@@ -157,7 +157,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
     } else if (level == DataLevel.METADATA || level == DataLevel.USER) {
       Range range = MetadataSchema.DeletesSection.getRange();
       if (continuePoint != null && !continuePoint.isEmpty()) {
-        String continueRow = MetadataSchema.DeletesSection.getRowPrefix() + continuePoint;
+        String continueRow = MetadataSchema.DeletesSection.encodeRow(continuePoint);
         range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true,
             range.getEndKey(), range.isEndKeyInclusive());
       }
@@ -170,8 +170,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
       }
       scanner.setRange(range);
 
-      return Iterators.transform(scanner.iterator(), entry -> entry.getKey().getRow().toString()
-          .substring(MetadataSchema.DeletesSection.getRowPrefix().length()));
+      return Iterators.transform(scanner.iterator(),
+          entry -> MetadataSchema.DeletesSection.decodeRow(entry.getKey().getRow().toString()));
 
     } else {
       throw new IllegalArgumentException();
@@ -196,7 +196,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
   public static Mutation createDeleteMutation(ServerContext context, TableId tableId,
       String pathToRemove) {
     Path path = context.getVolumeManager().getFullPath(tableId, pathToRemove);
-    Mutation delFlag = new Mutation(new Text(MetadataSchema.DeletesSection.getRowPrefix()
+ path));
+    Mutation delFlag =
+        new Mutation(new Text(MetadataSchema.DeletesSection.encodeRow(path.toString())));
     delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
     return delFlag;
   }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 5e758a2..1417dd5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.server.util;
 
+import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.TreeSet;
 
@@ -23,8 +24,8 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.security.Authorizations;
@@ -78,11 +79,11 @@ public class ListVolumesUsed {
 
   }
 
-  private static void listTable(String name, ServerContext context) throws Exception {
+  private static void listTable(Ample.DataLevel level, ServerContext context) throws Exception
{
 
-    System.out.println("Listing volumes referenced in " + name + " tablets section");
+    System.out.println("Listing volumes referenced in " + level + " tablets section");
 
-    Scanner scanner = context.createScanner(name, Authorizations.EMPTY);
+    Scanner scanner = context.createScanner(level.metaTable(), Authorizations.EMPTY);
 
     scanner.setRange(MetadataSchema.TabletsSection.getRange());
     scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
@@ -109,33 +110,25 @@ public class ListVolumesUsed {
       System.out.println("\tVolume : " + volume);
     }
 
+    System.out.println("Listing volumes referenced in " + level
+        + " deletes section (volume replacement occurrs at deletion time)");
     volumes.clear();
 
-    scanner.clearColumns();
-    scanner.setRange(MetadataSchema.DeletesSection.getRange());
-
-    for (Entry<Key,Value> entry : scanner) {
-      String delPath = entry.getKey().getRow().toString()
-          .substring(MetadataSchema.DeletesSection.getRowPrefix().length());
-      volumes.add(getTableURI(delPath));
+    Iterator<String> delPaths = context.getAmple().getGcCandidates(level, "");
+    while (delPaths.hasNext()) {
+      volumes.add(getTableURI(delPaths.next()));
     }
-
-    System.out.println("Listing volumes referenced in " + name
-        + " deletes section (volume replacement occurrs at deletion time)");
-
     for (String volume : volumes) {
       System.out.println("\tVolume : " + volume);
     }
 
+    System.out.println("Listing volumes referenced in " + level + " current logs");
     volumes.clear();
 
     WalStateManager wals = new WalStateManager(context);
     for (Path path : wals.getAllState().keySet()) {
       volumes.add(getLogURI(path.toString()));
     }
-
-    System.out.println("Listing volumes referenced in " + name + " current logs");
-
     for (String volume : volumes) {
       System.out.println("\tVolume : " + volume);
     }
@@ -144,9 +137,9 @@ public class ListVolumesUsed {
   public static void listVolumes(ServerContext context) throws Exception {
     listZookeeper(context);
     System.out.println();
-    listTable(RootTable.NAME, context);
+    listTable(Ample.DataLevel.METADATA, context);
     System.out.println();
-    listTable(MetadataTable.NAME, context);
+    listTable(Ample.DataLevel.USER, context);
   }
 
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 1856efb..7080cf6 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -186,7 +186,7 @@ public class GarbageCollectorIT extends ConfigurableMacBase {
   }
 
   private Mutation createDelMutation(String path, String cf, String cq, String val) {
-    Text row = new Text(MetadataSchema.DeletesSection.getRowPrefix() + path);
+    Text row = new Text(MetadataSchema.DeletesSection.encodeRow(path));
     Mutation delFlag = new Mutation(row);
     delFlag.put(cf, cq, val);
     return delFlag;
@@ -297,18 +297,15 @@ public class GarbageCollectorIT extends ConfigurableMacBase {
     return Iterators.size(Arrays.asList(cluster.getFileSystem().globStatus(path)).iterator());
   }
 
-  public static void addEntries(AccumuloClient client) throws Exception {
+  private void addEntries(AccumuloClient client) throws Exception {
     client.securityOperations().grantTablePermission(client.whoami(), MetadataTable.NAME,
         TablePermission.WRITE);
     try (BatchWriter bw = client.createBatchWriter(MetadataTable.NAME)) {
       for (int i = 0; i < 100000; ++i) {
         final Text emptyText = new Text("");
-        Text row =
-            new Text(String.format("%s/%020d/%s", MetadataSchema.DeletesSection.getRowPrefix(),
i,
-                "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
-                    + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj"));
-        Mutation delFlag = new Mutation(row);
-        delFlag.put(emptyText, emptyText, new Value(new byte[] {}));
+        String longpath = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
+            + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj";
+        Mutation delFlag = createDelMutation(String.format("/%020d/%s", i, longpath), "",
"", "");
         bw.addMutation(delFlag);
       }
     }


Mime
View raw message