cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [2/3] cassandra git commit: improve JBOD disk utilization
Date Wed, 19 Nov 2014 23:31:48 GMT
improve JBOD disk utilization


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2291a60e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2291a60e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2291a60e

Branch: refs/heads/trunk
Commit: 2291a60e9eded4486528acc0a8d12a062b21fc26
Parents: 4397c34
Author: Robert Stupp <snazy@snazy.de>
Authored: Wed Nov 19 16:17:05 2014 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed Nov 19 17:17:40 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 -
 .../org/apache/cassandra/db/Directories.java    | 161 ++++++++++++-------
 .../db/compaction/CompactionManager.java        |  12 +-
 .../cassandra/db/compaction/Scrubber.java       |   5 +-
 .../cassandra/io/util/DiskAwareRunnable.java    |  14 +-
 .../cassandra/service/StorageService.java       |  20 ---
 .../cassandra/streaming/StreamReader.java       |   3 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   8 +-
 .../apache/cassandra/db/DirectoriesTest.java    | 128 +++++++++++++++
 10 files changed, 252 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 41a5aaf..e008ab9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
  * Fix overflow on histogram computation (CASSANDRA-8028)
  * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
  * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
 Merged from 2.0:
  * Fix some failing queries that use multi-column relations
    on COMPACT STORAGE tables (CASSANDRA-8264)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7e1dd18..dec5370 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2240,11 +2240,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return directories.getSnapshotDetails();
     }
 
-    public boolean hasUnreclaimedSpace()
-    {
-        return getLiveDiskSpaceUsed() < getTotalDiskSpaceUsed();
-    }
-
     public long getTotalDiskSpaceUsed()
     {
         return metric.totalDiskSpaceUsed.count();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 4319481..eb33bd8 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -29,8 +29,7 @@ import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -39,8 +38,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Iterables;
-import com.google.common.primitives.Longs;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -96,7 +93,6 @@ public class Directories
             dataDirectories[i] = new DataDirectory(new File(locations[i]));
     }
 
-
     /**
      * Checks whether Cassandra has RWX permissions to the specified directory.  Logs an
error with
      * the details if it does not.
@@ -198,7 +194,7 @@ public class Directories
         for (int i = 0; i < dataDirectories.length; ++i)
         {
             // check if old SSTable directory exists
-            dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, this.metadata.cfName));
+            dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, metadata.cfName));
         }
         boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
         {
@@ -237,11 +233,10 @@ public class Directories
      */
     public File getLocationForDisk(DataDirectory dataDirectory)
     {
-        for (File dir : dataPaths)
-        {
-            if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
-                return dir;
-        }
+        if (dataDirectory != null)
+            for (File dir : dataPaths)
+                if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
+                    return dir;
         return null;
     }
 
@@ -255,65 +250,96 @@ public class Directories
         return null;
     }
 
+    /**
+     * Basically the same as calling {@link #getWriteableLocationAsFile(long)} with an unknown
size ({@code -1L}),
+     * which may return any non-blacklisted directory - even a data directory that has no
usable space.
+     * Do not use this method in production code.
+     *
+     * @throws IOError if all directories are blacklisted.
+     */
     public File getDirectoryForNewSSTables()
     {
-        File path = getWriteableLocationAsFile();
-
-        // Requesting GC has a chance to free space only if we're using mmap and a non SUN
jvm
-        if (path == null
-            && (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap
|| DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
-            && !FileUtils.isCleanerAvailable())
-        {
-            logger.info("Forcing GC to free up disk space.  Upgrade to the Oracle JVM to
avoid this");
-            StorageService.instance.requestGC();
-            // retry after GCing has forced unmap of compacted SSTables so they can be deleted
-            // Note: GCInspector will do this already, but only sun JVM supports GCInspector
so far
-            SSTableDeletingTask.rescheduleFailedTasks();
-            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
-            path = getWriteableLocationAsFile();
-        }
-
-        return path;
+        return getWriteableLocationAsFile(-1L);
     }
 
-    public File getWriteableLocationAsFile()
+    /**
+     * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes
as usable space.
+     *
+     * @throws IOError if all directories are blacklisted.
+     */
+    public File getWriteableLocationAsFile(long writeSize)
     {
-        return getLocationForDisk(getWriteableLocation());
+        return getLocationForDisk(getWriteableLocation(writeSize));
     }
 
     /**
-     * @return a non-blacklisted directory with the most free space and least current tasks.
+     * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes
as usable space.
      *
      * @throws IOError if all directories are blacklisted.
      */
-    public DataDirectory getWriteableLocation()
+    public DataDirectory getWriteableLocation(long writeSize)
     {
-        List<DataDirectory> candidates = new ArrayList<>();
+        List<DataDirectoryCandidate> candidates = new ArrayList<>();
+
+        long totalAvailable = 0L;
 
         // pick directories with enough space and so that resulting sstable dirs aren't blacklisted
for writes.
+        boolean tooBig = false;
         for (DataDirectory dataDir : dataDirectories)
         {
             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
                 continue;
-            candidates.add(dataDir);
+            DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
+            // exclude directory if its total writeSize does not fit to data directory
+            if (candidate.availableSpace < writeSize)
+            {
+                tooBig = true;
+                continue;
+            }
+            candidates.add(candidate);
+            totalAvailable += candidate.availableSpace;
         }
 
         if (candidates.isEmpty())
-            throw new IOError(new IOException("All configured data directories have been
blacklisted as unwritable for erroring out"));
+            if (tooBig)
+                return null;
+            else
+                throw new IOError(new IOException("All configured data directories have been
blacklisted as unwritable for erroring out"));
 
-        // sort directories by free space, in _descending_ order.
-        Collections.sort(candidates);
+        // shortcut for single data directory systems
+        if (candidates.size() == 1)
+            return candidates.get(0).dataDirectory;
+
+        sortWriteableCandidates(candidates, totalAvailable);
 
-        // sort directories by load, in _ascending_ order.
-        Collections.sort(candidates, new Comparator<DataDirectory>()
+        return pickWriteableDirectory(candidates);
+    }
+
+    // separated for unit testing
+    static DataDirectory pickWriteableDirectory(List<DataDirectoryCandidate> candidates)
+    {
+        // weighted random
+        double rnd = ThreadLocalRandom.current().nextDouble();
+        for (DataDirectoryCandidate candidate : candidates)
         {
-            public int compare(DataDirectory a, DataDirectory b)
-            {
-                return a.currentTasks.get() - b.currentTasks.get();
-            }
-        });
+            rnd -= candidate.perc;
+            if (rnd <= 0)
+                return candidate.dataDirectory;
+        }
+
+        // last resort
+        return candidates.get(0).dataDirectory;
+    }
+
+    // separated for unit testing
+    static void sortWriteableCandidates(List<DataDirectoryCandidate> candidates, long
totalAvailable)
+    {
+        // calculate free-space-percentage
+        for (DataDirectoryCandidate candidate : candidates)
+            candidate.calcFreePerc(totalAvailable);
 
-        return candidates.get(0);
+        // sort directories by perc
+        Collections.sort(candidates);
     }
 
     public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
@@ -336,31 +362,50 @@ public class Directories
         return new SSTableLister();
     }
 
-    public static class DataDirectory implements Comparable<DataDirectory>
+    public static class DataDirectory
     {
         public final File location;
-        public final AtomicInteger currentTasks = new AtomicInteger();
-        public final AtomicLong estimatedWorkingSize = new AtomicLong();
 
         public DataDirectory(File location)
         {
             this.location = location;
         }
 
-        /**
-         * @return estimated available disk space for bounded directory,
-         * excluding the expected size written by tasks in the queue.
-         */
-        public long getEstimatedAvailableSpace()
+        public long getAvailableSpace()
         {
-            // Load factor of 0.9 we do not want to use the entire disk that is too risky.
-            return location.getUsableSpace() - estimatedWorkingSize.get();
+            return location.getUsableSpace();
         }
+    }
+
+    static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>
+    {
+        final DataDirectory dataDirectory;
+        final long availableSpace;
+        double perc;
 
-        public int compareTo(DataDirectory o)
+        public DataDirectoryCandidate(DataDirectory dataDirectory)
         {
-            // we want to sort by free space in descending order
-            return -1 * Longs.compare(getEstimatedAvailableSpace(), o.getEstimatedAvailableSpace());
+            this.dataDirectory = dataDirectory;
+            this.availableSpace = dataDirectory.getAvailableSpace();
+        }
+
+        void calcFreePerc(long totalAvailableSpace)
+        {
+            double w = availableSpace;
+            w /= totalAvailableSpace;
+            perc = w;
+        }
+
+        public int compareTo(DataDirectoryCandidate o)
+        {
+            if (this == o)
+                return 0;
+
+            int r = Double.compare(perc, o.perc);
+            if (r != 0)
+                return -r;
+            // last resort
+            return System.identityHashCode(this) - System.identityHashCode(o);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 272b533..61628ff 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -658,9 +658,11 @@ public class CompactionManager implements CompactionManagerMBean
     {
         assert !cfs.isIndex();
 
+        Set<SSTableReader> sstableSet = Collections.singleton(sstable);
+
         if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
         {
-            cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(),
OperationType.CLEANUP);
+            cfs.getDataTracker().markCompactedSSTablesReplaced(sstableSet, Collections.<SSTableReader>emptyList(),
OperationType.CLEANUP);
             return;
         }
         if (!needsCleanup(sstable, ranges))
@@ -674,13 +676,13 @@ public class CompactionManager implements CompactionManagerMBean
         long totalkeysWritten = 0;
 
         int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
-                                               (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+                                               (int) (SSTableReader.getApproximateKeyCount(sstableSet)));
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
 
         logger.info("Cleaning up {}", sstable);
 
-        File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables();
+        File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableSet,
OperationType.CLEANUP));
         if (compactionFileLocation == null)
             throw new IOException("disk full");
 
@@ -691,7 +693,7 @@ public class CompactionManager implements CompactionManagerMBean
         Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
         SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge,
false);
         List<SSTableReader> finished;
-        try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable),
getDefaultGcBefore(cfs)))
+        try (CompactionController controller = new CompactionController(cfs, sstableSet,
getDefaultGcBefore(cfs)))
         {
             writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize,
sstable.getSSTableMetadata().repairedAt, sstable));
 
@@ -994,7 +996,7 @@ public class CompactionManager implements CompactionManagerMBean
             Set<SSTableReader> sstableAsSet = new HashSet<>();
             sstableAsSet.add(sstable);
 
-            File destination = cfs.directories.getDirectoryForNewSSTables();
+            File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
OperationType.ANTICOMPACTION));
             SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet,
sstable.maxDataAge, false);
             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet,
sstable.maxDataAge, false);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0cd71f2..2f53ab9 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -81,12 +81,13 @@ public class Scrubber implements Closeable
         this.skipCorrupted = skipCorrupted;
         this.isOffline = isOffline;
 
+        List<SSTableReader> toScrub = Collections.singletonList(sstable);
+
         // Calculate the expected compacted filesize
-        this.destination = cfs.directories.getDirectoryForNewSSTables();
+        this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub,
OperationType.SCRUB));
         if (destination == null)
             throw new IOException("disk full");
 
-        List<SSTableReader> toScrub = Collections.singletonList(sstable);
         // If we run scrub offline, we should never purge tombstone, as we cannot know if
other sstable have data that the tombstone deletes.
         this.controller = isOffline
                         ? new ScrubController(cfs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 198a88d..6d453e5 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -34,24 +34,14 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
         while (true)
         {
             writeSize = getExpectedWriteSize();
-            directory = getDirectories().getWriteableLocation();
+            directory = getDirectories().getWriteableLocation(writeSize);
             if (directory != null || !reduceScopeForLimitedSpace())
                 break;
         }
         if (directory == null)
             throw new RuntimeException("Insufficient disk space to write " + writeSize +
" bytes");
 
-        directory.currentTasks.incrementAndGet();
-        directory.estimatedWorkingSize.addAndGet(writeSize);
-        try
-        {
-            runWith(getDirectories().getLocationForDisk(directory));
-        }
-        finally
-        {
-            directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
-            directory.currentTasks.decrementAndGet();
-        }
+        runWith(getDirectories().getLocationForDisk(directory));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index ae8c798..79cea8e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3537,26 +3537,6 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         return isClientMode;
     }
 
-    public synchronized void requestGC()
-    {
-        if (hasUnreclaimedSpace())
-        {
-            logger.info("requesting GC to free disk space");
-            System.gc();
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-    }
-
-    private boolean hasUnreclaimedSpace()
-    {
-        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-        {
-            if (cfs.hasUnreclaimedSpace())
-                return true;
-        }
-        return false;
-    }
-
     public String getOperationMode()
     {
         return operationMode.toString();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 3014549..c96a925 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -116,7 +115,7 @@ public class StreamReader
 
     protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt)
throws IOException
     {
-        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
+        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
         desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 33da3d1..aa18954 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -113,7 +116,10 @@ public class StreamReceiveTask extends StreamTask
             }
             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-            StreamLockfile lockfile = new StreamLockfile(cfs.directories.getWriteableLocationAsFile(),
UUID.randomUUID());
+            File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size()
* 256);
+            if (lockfiledir == null)
+                throw new IOError(new IOException("All disks full"));
+            StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
             lockfile.create(task.sstables);
             List<SSTableReader> readers = new ArrayList<>();
             for (SSTableWriter writer : task.sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 9e6b26b..34d10d2 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,7 +45,10 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class DirectoriesTest
 {
@@ -238,4 +242,128 @@ public class DirectoriesTest
             }
         }
     }
+
+    @Test
+    public void testDiskFreeSpace()
+    {
+        DataDirectory[] dataDirectories = new DataDirectory[]
+                                          {
+                                          new DataDirectory(new File("/nearlyFullDir1"))
+                                          {
+                                              public long getAvailableSpace()
+                                              {
+                                                  return 11L;
+                                              }
+                                          },
+                                          new DataDirectory(new File("/nearlyFullDir2"))
+                                          {
+                                              public long getAvailableSpace()
+                                              {
+                                                  return 10L;
+                                              }
+                                          },
+                                          new DataDirectory(new File("/uniformDir1"))
+                                          {
+                                              public long getAvailableSpace()
+                                              {
+                                                  return 1000L;
+                                              }
+                                          },
+                                          new DataDirectory(new File("/uniformDir2"))
+                                          {
+                                              public long getAvailableSpace()
+                                              {
+                                                  return 999L;
+                                              }
+                                          },
+                                          new DataDirectory(new File("/veryFullDir"))
+                                          {
+                                              public long getAvailableSpace()
+                                              {
+                                                  return 4L;
+                                              }
+                                          }
+                                          };
+
+        // directories should be sorted
+        // 1. by their free space ratio
+        // before weighted random is applied
+        List<Directories.DataDirectoryCandidate> candidates = getWriteableDirectories(dataDirectories,
0L);
+        assertSame(dataDirectories[2], candidates.get(0).dataDirectory); // available: 1000
+        assertSame(dataDirectories[3], candidates.get(1).dataDirectory); // available: 999
+        assertSame(dataDirectories[0], candidates.get(2).dataDirectory); // available: 11
+        assertSame(dataDirectories[1], candidates.get(3).dataDirectory); // available: 10
+
+        // check for writeSize == 5
+        Map<DataDirectory, DataDirectory> testMap = new IdentityHashMap<>();
+        for (int i=0; ; i++)
+        {
+            candidates = getWriteableDirectories(dataDirectories, 5L);
+            assertEquals(4, candidates.size());
+
+            DataDirectory dir = Directories.pickWriteableDirectory(candidates);
+            testMap.put(dir, dir);
+
+            assertFalse(testMap.size() > 4);
+            if (testMap.size() == 4)
+            {
+                // at least (rule of thumb) 100 iterations to see whether there are more
(wrong) directories returned
+                if (i >= 100)
+                    break;
+            }
+
+            // random weighted writeable directory algorithm fails to return all possible
directories after
+            // many tries
+            if (i >= 10000000)
+                fail();
+        }
+
+        // check for writeSize == 11
+        testMap.clear();
+        for (int i=0; ; i++)
+        {
+            candidates = getWriteableDirectories(dataDirectories, 11L);
+            assertEquals(3, candidates.size());
+            for (Directories.DataDirectoryCandidate candidate : candidates)
+                assertTrue(candidate.dataDirectory.getAvailableSpace() >= 11L);
+
+            DataDirectory dir = Directories.pickWriteableDirectory(candidates);
+            testMap.put(dir, dir);
+
+            assertFalse(testMap.size() > 3);
+            if (testMap.size() == 3)
+            {
+                // at least (rule of thumb) 100 iterations
+                if (i >= 100)
+                    break;
+            }
+
+            // random weighted writeable directory algorithm fails to return all possible
directories after
+            // many tries
+            if (i >= 10000000)
+                fail();
+        }
+    }
+
+    private List<Directories.DataDirectoryCandidate> getWriteableDirectories(DataDirectory[]
dataDirectories, long writeSize)
+    {
+        // copied from Directories.getWriteableLocation(long)
+        List<Directories.DataDirectoryCandidate> candidates = new ArrayList<>();
+
+        long totalAvailable = 0L;
+
+        for (DataDirectory dataDir : dataDirectories)
+            {
+                Directories.DataDirectoryCandidate candidate = new Directories.DataDirectoryCandidate(dataDir);
+                // exclude directory if its total writeSize does not fit to data directory
+                if (candidate.availableSpace < writeSize)
+                    continue;
+                candidates.add(candidate);
+                totalAvailable += candidate.availableSpace;
+            }
+
+        Directories.sortWriteableCandidates(candidates, totalAvailable);
+
+        return candidates;
+    }
 }


Mime
View raw message