accumulo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] keith-turner closed pull request #645: Add space aware volume chooser
Date Tue, 02 Oct 2018 14:49:43 GMT
keith-turner closed pull request #645: Add space aware volume chooser
URL: https://github.com/apache/accumulo/pull/645
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
b/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
new file mode 100644
index 0000000000..2982459d90
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * A {@link PreferredVolumeChooser} that takes remaining HDFS space into account when making
a
+ * volume choice rather than a simpler round robin. The list of volumes to use can be limited
using
+ * the same properties as {@link PreferredVolumeChooser}
+ */
+public class SpaceAwareVolumeChooser extends PreferredVolumeChooser {
+
+  public static final String HDFS_SPACE_RECOMPUTE_INTERVAL = Property.GENERAL_ARBITRARY_PROP_PREFIX
+      .getKey() + "spaceaware.volume.chooser.recompute.interval";
+
+  // Default time to wait in ms. Defaults to 5 min
+  private long defaultComputationCacheDuration = 300000;
+  LoadingCache<List<String>,WeightedRandomCollection> choiceCache = null;
+
+  private static final Logger log = LoggerFactory.getLogger(SpaceAwareVolumeChooser.class);
+
+  @Override
+  public String choose(VolumeChooserEnvironment env, String[] options)
+      throws VolumeChooserException {
+
+    options = getPreferredVolumes(env, options);
+
+    try {
+      return getCache(env).get(Arrays.asList(options)).next();
+    } catch (ExecutionException e) {
+      throw new IllegalStateException("Execution exception when attempting to cache choice",
e);
+    }
+  }
+
+  private synchronized LoadingCache<List<String>,WeightedRandomCollection> getCache(
+      VolumeChooserEnvironment env) {
+
+    if (choiceCache == null) {
+      ServerConfigurationFactory scf = loadConfFactory(env);
+      AccumuloConfiguration systemConfiguration = scf.getSystemConfiguration();
+      String propertyValue = systemConfiguration.get(HDFS_SPACE_RECOMPUTE_INTERVAL);
+
+      long computationCacheDuration = StringUtils.isNotBlank(propertyValue)
+          ? Long.parseLong(propertyValue)
+          : defaultComputationCacheDuration;
+
+      choiceCache = CacheBuilder.newBuilder()
+          .expireAfterWrite(computationCacheDuration, TimeUnit.MILLISECONDS)
+          .build(new CacheLoader<List<String>,WeightedRandomCollection>() {
+            public WeightedRandomCollection load(List<String> key) {
+              return new WeightedRandomCollection(key, env);
+            }
+          });
+    }
+
+    return choiceCache;
+  }
+
+  public class WeightedRandomCollection {
+    private final NavigableMap<Double,String> map = new TreeMap<Double,String>();
+    private final Random random;
+    private double total = 0;
+
+    public WeightedRandomCollection(List<String> options, VolumeChooserEnvironment
env) {
+      this.random = new Random();
+
+      if (options.size() < 1) {
+        throw new IllegalStateException("Options was empty! No valid volumes to choose from.");
+      }
+
+      VolumeManager manager = env.getServerContext().getVolumeManager();
+
+      // Compute percentage space available on each volume
+      for (String option : options) {
+        FileSystem pathFs = manager.getVolumeByPath(new Path(option)).getFileSystem();
+        try {
+          FsStatus optionStatus = pathFs.getStatus();
+          double percentFree = ((double) optionStatus.getRemaining() / optionStatus.getCapacity());
+          add(percentFree, option);
+        } catch (IOException e) {
+          log.error("Unable to get file system status for" + option, e);
+        }
+      }
+
+      if (map.size() < 1) {
+        throw new IllegalStateException(
+            "Weighted options was empty! Could indicate an issue getting file system status
or "
+                + "no free space on any volume");
+      }
+    }
+
+    public WeightedRandomCollection add(double weight, String result) {
+      if (weight <= 0) {
+        log.info("Weight was 0. Not adding " + result);
+        return this;
+      }
+      total += weight;
+      map.put(total, result);
+      return this;
+    }
+
+    public String next() {
+      double value = random.nextDouble() * total;
+      return map.higherEntry(value).getValue();
+    }
+  }
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
b/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
new file mode 100644
index 0000000000..1b4f032744
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+public class SpaceAwareVolumeChooserTest {
+  VolumeManager volumeManager = null;
+  VolumeChooserEnvironment chooserEnv = null;
+  ServerContext serverContext = null;
+  ServerConfigurationFactory serverConfigurationFactory = null;
+  AccumuloConfiguration sysConfig = null;
+  Volume vol1 = null;
+  Volume vol2 = null;
+  FileSystem fs1 = null;
+  FileSystem fs2 = null;
+  FsStatus status1 = null;
+  FsStatus status2 = null;
+
+  int iterations = 1000;
+
+  String volumeOne = "hdfs://nn1:8020/apps/accumulo1/tables";
+  String volumeTwo = "hdfs://nn2:8020/applications/accumulo/tables";
+
+  // Different volumes with different paths
+  String[] tableDirs = {volumeOne, volumeTwo};
+
+  int vol1Count = 0;
+  int vol2Count = 0;
+
+  @Before
+  public void beforeTest() {
+    volumeManager = EasyMock.createMock(VolumeManager.class);
+    serverContext = EasyMock.createMock(ServerContext.class);
+    serverConfigurationFactory = EasyMock.createMock(ServerConfigurationFactory.class);
+    sysConfig = EasyMock.createMock(AccumuloConfiguration.class);
+    vol1 = EasyMock.createMock(Volume.class);
+    vol2 = EasyMock.createMock(Volume.class);
+    fs1 = EasyMock.createMock(FileSystem.class);
+    fs2 = EasyMock.createMock(FileSystem.class);
+    status1 = EasyMock.createMock(FsStatus.class);
+    status2 = EasyMock.createMock(FsStatus.class);
+    chooserEnv = new VolumeChooserEnvironment(VolumeChooserEnvironment.ChooserScope.DEFAULT,
+        serverContext);
+
+  }
+
+  private void testSpecificSetup(long percentage1, long percentage2, String cacheDuration,
+      int timesToCallPreferredVolumeChooser, boolean anyTimes) throws IOException {
+    int max = iterations + 1;
+    int min = 1;
+    int updatePropertyMax = timesToCallPreferredVolumeChooser + iterations;
+    if (anyTimes) {
+      max = iterations + 1;
+      updatePropertyMax = max + 1;
+    }
+    // Volume 1 is percentage1 full
+    EasyMock.expect(status1.getRemaining()).andReturn(percentage1).times(min, max);
+    EasyMock.expect(status1.getCapacity()).andReturn(100L).times(min, max);
+
+    // Volume 2 is percentage2 full
+    EasyMock.expect(status2.getRemaining()).andReturn(percentage2).times(min, max);
+    EasyMock.expect(status2.getCapacity()).andReturn(100L).times(min, max);
+
+    EasyMock.expect(sysConfig.get(SpaceAwareVolumeChooser.HDFS_SPACE_RECOMPUTE_INTERVAL))
+        .andReturn(cacheDuration).times(1);
+    EasyMock
+        .expect(sysConfig.get(PreferredVolumeChooser
+            .getPropertyNameForScope(VolumeChooserEnvironment.ChooserScope.DEFAULT)))
+        .andReturn(String.join(",", tableDirs)).times(timesToCallPreferredVolumeChooser);
+
+    EasyMock.expect(serverContext.getVolumeManager()).andReturn(volumeManager).times(min,
+        Math.max(max, updatePropertyMax));
+    EasyMock.expect(serverContext.getServerConfFactory()).andReturn(serverConfigurationFactory)
+        .times(min, updatePropertyMax);
+    EasyMock.expect(serverConfigurationFactory.getSystemConfiguration()).andReturn(sysConfig)
+        .times(1, updatePropertyMax);
+
+    EasyMock.expect(volumeManager.getVolumeByPath(new Path(volumeOne))).andReturn(vol1).times(min,
+        max);
+    EasyMock.expect(volumeManager.getVolumeByPath(new Path(volumeTwo))).andReturn(vol2).times(min,
+        max);
+    EasyMock.expect(vol1.getFileSystem()).andReturn(fs1).times(min, max);
+    EasyMock.expect(vol2.getFileSystem()).andReturn(fs2).times(min, max);
+    EasyMock.expect(fs1.getStatus()).andReturn(status1).times(min, max);
+    EasyMock.expect(fs2.getStatus()).andReturn(status2).times(min, max);
+
+    EasyMock.replay(serverContext, vol1, vol2, fs1, fs2, status1, status2, volumeManager,
+        serverConfigurationFactory, sysConfig);
+  }
+
+  @After
+  public void afterTest() {
+
+    EasyMock.verify(serverContext, vol1, vol2, fs1, fs2, status1, status2, volumeManager,
+        serverConfigurationFactory, sysConfig);
+
+    volumeManager = null;
+    serverContext = null;
+    vol1 = null;
+    vol2 = null;
+    fs1 = null;
+    fs2 = null;
+    status1 = null;
+    status2 = null;
+    vol1Count = 0;
+    vol2Count = 0;
+  }
+
+  @Test
+  public void testEvenWeightsWithCaching() throws IOException {
+
+    testSpecificSetup(10L, 10L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations / 2, vol1Count, iterations / 10);
+    assertEquals(iterations / 2, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testEvenWeightsNoCaching() throws IOException {
+
+    testSpecificSetup(10L, 10L, "0", iterations, true);
+
+    makeChoices();
+
+    assertEquals(iterations / 2, vol1Count, iterations / 10);
+    assertEquals(iterations / 2, vol2Count, iterations / 10);
+
+  }
+
+  @Test(expected = UncheckedExecutionException.class)
+  public void testNoFreeSpace() throws IOException {
+
+    testSpecificSetup(0L, 0L, null, 1, false);
+
+    makeChoices();
+  }
+
+  @Test
+  public void testNinetyTen() throws IOException {
+
+    testSpecificSetup(90L, 10L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations * .9, vol1Count, iterations / 10);
+    assertEquals(iterations * .1, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testTenNinety() throws IOException {
+
+    testSpecificSetup(10L, 90L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations * .1, vol1Count, iterations / 10);
+    assertEquals(iterations * .9, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testWithNoCaching() throws IOException {
+
+    testSpecificSetup(10L, 90L, "0", iterations, true);
+
+    makeChoices();
+
+    assertEquals(iterations * .1, vol1Count, iterations / 10);
+    assertEquals(iterations * .9, vol2Count, iterations / 10);
+
+  }
+
+  private void makeChoices() {
+    SpaceAwareVolumeChooser chooser = new SpaceAwareVolumeChooser();
+    for (int i = 0; i < iterations; i++) {
+      String choice = chooser.choose(chooserEnv, tableDirs);
+      if (choice.equals(volumeOne)) {
+        vol1Count += 1;
+      }
+
+      if (choice.equals(volumeTwo)) {
+        vol2Count += 1;
+      }
+    }
+
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message