accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject git commit: ACCUMULO-3263 utility to redirect tablets to use volumes fairly
Date Tue, 28 Oct 2014 21:14:03 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.6 b0771ef66 -> 2f2bf66d9


ACCUMULO-3263 utility to redirect tablets to use volumes fairly


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2f2bf66d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2f2bf66d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2f2bf66d

Branch: refs/heads/1.6
Commit: 2f2bf66d9797e36b10221910c81066645d115275
Parents: b0771ef
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Tue Oct 28 17:12:17 2014 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Tue Oct 28 17:13:38 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/server/util/Admin.java  |  12 ++
 .../accumulo/server/util/RandomizeVolumes.java  | 127 ++++++++++++++++
 .../test/RewriteTabletDirectoriesIT.java        | 143 +++++++++++++++++++
 .../java/org/apache/accumulo/test/VolumeIT.java |   1 -
 4 files changed, 282 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f2bf66d/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 224f786..5e7cb3e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -123,6 +123,12 @@ public class Admin {
     @Parameter(names = {"-d", "--directory"}, description = "directory to place config files")
     String directory = null;
   }
+  
+  @Parameters(commandDescription = "redistribute tablet directories across the current volume
list")
+  static class RandomizeVolumesCommand {
+    @Parameter(names={"-t"}, description = "table to update", required=true)
+    String table = null;
+  }
 
   public static void main(String[] args) {
     boolean everything;
@@ -152,6 +158,10 @@ public class Admin {
     cl.addCommand("stopAll", stopAllOpts);
     StopMasterCommand stopMasterOpts = new StopMasterCommand();
     cl.addCommand("stopMaster", stopMasterOpts);
+    
+    RandomizeVolumesCommand randomizeVolumesOpts = new RandomizeVolumesCommand();
+    cl.addCommand("randomizeVolumes", randomizeVolumesOpts);
+    
     cl.parse(args);
 
     if (opts.help || cl.getParsedCommand() == null) {
@@ -197,6 +207,8 @@ public class Admin {
         printConfig(instance, principal, token, dumpConfigCommand);
       } else if (cl.getParsedCommand().equals("volumes")) {
         ListVolumesUsed.listVolumes(instance, principal, token);
+      } else if (cl.getParsedCommand().equals("randomizeVolumes")) {
+        RandomizeVolumes.randomize(instance, new Credentials(principal, token), randomizeVolumesOpts.table);
       } else {
         everything = cl.getParsedCommand().equals("stopAll");
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f2bf66d/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java
b/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java
new file mode 100644
index 0000000..197c46c
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RandomizeVolumes.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.log4j.Logger;
+
+public class RandomizeVolumes {
+  private static final Logger log = Logger.getLogger(RandomizeVolumes.class);
+  
+  public static void main(String[] args) {
+    ClientOnRequiredTable opts = new ClientOnRequiredTable();
+    opts.parseArgs(RandomizeVolumes.class.getName(), args);
+    String principal = SystemCredentials.get().getPrincipal();
+    AuthenticationToken token = SystemCredentials.get().getToken();
+    try {
+      int status = randomize(opts.getInstance(), new Credentials(principal, token), opts.getTableName());
+      System.exit(status);
+    } catch (Exception ex) {
+      log.error(ex, ex);
+      System.exit(4);
+    }
+  }
+
+  public static int randomize(Instance instance, Credentials credentials, String tableName)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    SiteConfiguration siteConfiguration = new ServerConfigurationFactory(instance).getSiteConfiguration();
+    String volumesConfig = siteConfiguration.get(Property.INSTANCE_VOLUMES);
+    if (null == volumesConfig || "".equals(volumesConfig)) {
+      log.error(Property.INSTANCE_VOLUMES.getKey() + " is not set in accumulo-site.xml");
+      return 1;
+    }
+    String[] volumes = volumesConfig.split(",");
+    if (volumes.length < 2) {
+      log.error("There are not enough volumes configured: " + volumesConfig);
+      return 2;
+    }
+    Connector c = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
+    String tableId = c.tableOperations().tableIdMap().get(tableName);
+    if (null == tableId) {
+      log.error("Could not determine the table ID for table " + tableName);
+      return 3;
+    }
+    TableState tableState = TableManager.getInstance().getTableState(tableId);
+    if (TableState.OFFLINE != tableState) {
+      log.debug("Taking " + tableName + " offline");
+      c.tableOperations().offline(tableName, true);
+      log.debug(tableName + " offline");
+    }
+    log.debug("Rewriting entries for " + tableName);
+    Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    final ColumnFQ DIRCOL = MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
+    DIRCOL.fetch(scanner);
+    scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    BatchWriter writer = c.createBatchWriter(MetadataTable.NAME, null);
+    int count = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      String oldLocation = entry.getValue().toString();
+      String directory;
+      if (oldLocation.contains(":")) {
+        String[] parts = oldLocation.split("/");
+        String tableIdEntry = parts[parts.length - 2];
+        if (!tableIdEntry.equals(tableId)) {
+          log.error("Unexpected table id found: " + tableIdEntry + ", expected " + tableId
+ "; skipping");
+          continue;
+        }
+        directory = parts[parts.length - 1];
+      } else {
+        directory = oldLocation.substring(1);
+      }
+      Mutation m = new Mutation(entry.getKey().getRow());
+      String newLocation = volumes[count % volumes.length] + "/" + ServerConstants.TABLE_DIR
+ "/" + tableId + "/" + directory;
+      m.put(DIRCOL.getColumnFamily(), DIRCOL.getColumnQualifier(), new Value(newLocation.getBytes()));
+      if (log.isTraceEnabled()) {
+        log.trace("Replacing " + oldLocation + " with " + newLocation);
+      }
+      writer.addMutation(m);
+      count++;
+    }
+    writer.close();
+    log.info("Updated " + count + " entries for table " + tableName);
+    if (TableState.OFFLINE != tableState) {
+      c.tableOperations().online(tableName, true);
+      log.info("table " + tableName + " back online");
+    }
+    return 0;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f2bf66d/test/src/test/java/org/apache/accumulo/test/RewriteTabletDirectoriesIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/RewriteTabletDirectoriesIT.java b/test/src/test/java/org/apache/accumulo/test/RewriteTabletDirectoriesIT.java
new file mode 100644
index 0000000..4cd4a3e
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/RewriteTabletDirectoriesIT.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.util.ColumnFQ;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.server.util.RandomizeVolumes;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+// ACCUMULO-3263
+public class RewriteTabletDirectoriesIT extends ConfigurableMacIT {
+  
+  private Path v1, v2;
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    File baseDir = cfg.getDir();
+    File volDirBase = new File(baseDir, "volumes");
+    File v1f = new File(volDirBase, "v1");
+    File v2f = new File(volDirBase, "v2");
+    v1f.mkdir();
+    v2f.mkdir();
+    v1 = new Path("file://" + v1f.getAbsolutePath());
+    v2 = new Path("file://" + v2f.getAbsolutePath());
+
+    // Run MAC on two locations in the local file system
+    cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString());
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    super.configure(cfg, hadoopCoreSite);
+  }
+
+  @Test(timeout = 4 * 60 * 1000)
+  public void test() throws Exception {
+    Connector c = getConnector();
+    c.securityOperations().grantTablePermission(c.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+    final String tableName = getUniqueNames(1)[0];
+    final SortedSet<Text> splits = new TreeSet<Text>();
+    for (String split : "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(","))
{
+      splits.add(new Text(split));
+    }
+    c.tableOperations().create(tableName);
+    c.tableOperations().addSplits(tableName, splits);
+    
+    BatchScanner scanner = c.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY,
1);
+    ColumnFQ DC = MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
+    DC.fetch(scanner);
+    String tableId = c.tableOperations().tableIdMap().get(tableName);
+    scanner.setRanges(Collections.singletonList(MetadataSchema.TabletsSection.getRange(tableId)));
+    // verify the directory entries are all on v1, make a few entries relative
+    BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, null);
+    int count = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      assertTrue(entry.getValue().toString().contains(v1.toString()));
+      count++;
+      if (count % 2 == 0) {
+        String parts[] = entry.getValue().toString().split("/");
+        Mutation m = new Mutation(entry.getKey().getRow());
+        m.put(DC.getColumnFamily(), DC.getColumnQualifier(), new Value(("/" + parts[parts.length
- 1]).getBytes()));
+        bw.addMutation(m);
+      }
+    }
+    bw.close();
+    assertEquals(splits.size() + 1, count);
+    
+    // This should fail: only one volume
+    assertEquals(2, cluster.exec(RandomizeVolumes.class, "-z", cluster.getZooKeepers(), "-i",
c.getInstance().getInstanceName(), "-t", tableName).waitFor());
+
+    
+    cluster.stop();
+
+    // add the 2nd volume
+    Configuration conf = new Configuration(false);
+    conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+    conf.set(Property.INSTANCE_VOLUMES.getKey(), v1.toString() + "," + v2.toString());
+    BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(),
"accumulo-site.xml")));
+    conf.writeXml(fos);
+    fos.close();
+    
+    // initialize volume
+    assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+    cluster.start();
+    c = getConnector();
+
+    // change the directory entries
+    assertEquals(0, cluster.exec(Admin.class, "randomizeVolumes", "-t", tableName).waitFor());
+
+    // verify a more equal sharing
+    int v1Count = 0, v2Count = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      if (entry.getValue().toString().contains(v1.toString())) {
+        v1Count++;
+      }
+      if (entry.getValue().toString().contains(v2.toString())) {
+        v2Count++;
+      }
+    }
+    assertEquals(splits.size() + 1, v1Count + v2Count);
+    assertTrue(Math.abs(v1Count - v2Count) < 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f2bf66d/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 3af7eb8..a338d5f 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -298,7 +298,6 @@ public class VolumeIT extends ConfigurableMacIT {
     cluster.start();
 
     verifyVolumesUsed(tableNames[1], false, v1, v2, v3);
-
   }
 
   @Test


Mime
View raw message