cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmcken...@apache.org
Subject [14/15] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.5
Date Fri, 01 Apr 2016 15:55:51 GMT
Merge branch 'cassandra-3.0' into cassandra-3.5


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af9b9cd5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af9b9cd5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af9b9cd5

Branch: refs/heads/trunk
Commit: af9b9cd5ae248746475ada5434d08370148bbce0
Parents: 4111e4f bd4cab2
Author: Josh McKenzie <josh.mckenzie@datastax.com>
Authored: Fri Apr 1 11:51:13 2016 -0400
Committer: Josh McKenzie <josh.mckenzie@datastax.com>
Committed: Fri Apr 1 11:52:45 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 134 ++++++++++------
 .../org/apache/cassandra/db/Directories.java    |  16 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  29 +++-
 .../db/commitlog/CommitLogSegmentManager.java   |   4 +-
 .../cassandra/io/util/DiskAwareRunnable.java    |  54 -------
 .../apache/cassandra/cql3/OutOfSpaceTest.java   | 157 +++++++++++++++++++
 7 files changed, 276 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b542a67,b9376bc..73d9ab5
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -109,9 -75,9 +109,10 @@@ Merged from 2.2
   * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
   * Fix paging on DISTINCT queries repeats result when first row in partition changes
     (CASSANDRA-10010)
 + * (cqlsh) Support timezone conversion using pytz (CASSANDRA-10397)
   * cqlsh: change default encoding to UTF-8 (CASSANDRA-11124)
  Merged from 2.1:
+  * Fix out-of-space error treatment in memtable flushing (CASSANDRA-11448).
   * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342)
   * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
   * Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f193c4d,aa12b80..6e01b34
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -910,6 -893,7 +910,7 @@@ public class ColumnFamilyStore implemen
          final OpOrder.Barrier writeBarrier;
          final CountDownLatch latch = new CountDownLatch(1);
          final ReplayPosition lastReplayPosition;
 -        volatile FSWriteError flushFailure = null;
++        volatile Throwable flushFailure = null;
  
          private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition
lastReplayPosition)
          {
@@@ -951,6 -936,9 +953,9 @@@
              }
  
              metric.pendingFlushes.dec();
+ 
+             if (flushFailure != null)
 -                throw flushFailure;
++                throw Throwables.propagate(flushFailure);
          }
      }
  
@@@ -1050,81 -1038,26 +1055,106 @@@
  
              metric.memtableSwitchCount.inc();
  
-             for (Memtable memtable : memtables)
+             try
              {
-                 List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
-                 long totalBytesOnDisk = 0;
-                 long maxBytesOnDisk = 0;
-                 long minBytesOnDisk = Long.MAX_VALUE;
-                 List<SSTableReader> sstables = new ArrayList<>();
-                 try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
+                 for (Memtable memtable : memtables)
+                 {
 -                    // flush the memtable
 -                    MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
 -                    reclaim(memtable);
++                    flushMemtable(memtable);
+                 }
+             }
 -            catch (FSWriteError e)
++            catch (Throwable t)
+             {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                // If we weren't killed, try to continue work but do not allow CommitLog
to be discarded.
 -                postFlush.flushFailure = e;
++                JVMStabilityInspector.inspectThrowable(t);
++                postFlush.flushFailure = t;
+             }
 -
+             // signal the post-flush we've done our work
+             postFlush.latch.countDown();
+         }
+ 
++        public void flushMemtable(Memtable memtable)
++        {
++            List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
++            long totalBytesOnDisk = 0;
++            long maxBytesOnDisk = 0;
++            long minBytesOnDisk = Long.MAX_VALUE;
++            List<SSTableReader> sstables = new ArrayList<>();
++            try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
++            {
++                List<Memtable.FlushRunnable> flushRunnables = null;
++                List<SSTableMultiWriter> flushResults = null;
++
++                try
 +                {
 +                    // flush the memtable
-                     List<Memtable.FlushRunnable> flushRunnables = memtable.flushRunnables(txn);
++                    flushRunnables = memtable.flushRunnables(txn);
 +
 +                    for (int i = 0; i < flushRunnables.size(); i++)
 +                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
 +
-                     List<SSTableMultiWriter> flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
++                    flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
++                }
++                catch (Throwable t)
++                {
++                    t = memtable.abortRunnables(flushRunnables, t);
++                    t = txn.abort(t);
++                    throw Throwables.propagate(t);
++                }
 +
-                     try
++                try
++                {
++                    Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
++                    while (writerIterator.hasNext())
 +                    {
-                         Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
-                         while (writerIterator.hasNext())
++                        @SuppressWarnings("resource")
++                        SSTableMultiWriter writer = writerIterator.next();
++                        if (writer.getFilePointer() > 0)
 +                        {
-                             @SuppressWarnings("resource")
-                             SSTableMultiWriter writer = writerIterator.next();
-                             if (writer.getFilePointer() > 0)
-                             {
-                                 writer.setOpenResult(true).prepareToCommit();
-                             }
-                             else
-                             {
-                                 maybeFail(writer.abort(null));
-                                 writerIterator.remove();
-                             }
++                            writer.setOpenResult(true).prepareToCommit();
++                        }
++                        else
++                        {
++                            maybeFail(writer.abort(null));
++                            writerIterator.remove();
 +                        }
 +                    }
-                     catch (Throwable t)
-                     {
-                         for (SSTableMultiWriter writer : flushResults)
-                             t = writer.abort(t);
-                         t = txn.abort(t);
-                         Throwables.propagate(t);
-                     }
++                }
++                catch (Throwable t)
++                {
++                    for (SSTableMultiWriter writer : flushResults)
++                        t = writer.abort(t);
++                    t = txn.abort(t);
++                    Throwables.propagate(t);
++                }
 +
-                     txn.prepareToCommit();
++                txn.prepareToCommit();
 +
-                     Throwable accumulate = null;
-                     for (SSTableMultiWriter writer : flushResults)
-                         accumulate = writer.commit(accumulate);
++                Throwable accumulate = null;
++                for (SSTableMultiWriter writer : flushResults)
++                    accumulate = writer.commit(accumulate);
 +
-                     maybeFail(txn.commit(accumulate));
++                maybeFail(txn.commit(accumulate));
 +
-                     for (SSTableMultiWriter writer : flushResults)
++                for (SSTableMultiWriter writer : flushResults)
++                {
++                    Collection<SSTableReader> flushedSSTables = writer.finished();
++                    for (SSTableReader sstable : flushedSSTables)
 +                    {
-                         Collection<SSTableReader> flushedSSTables = writer.finished();
-                         for (SSTableReader sstable : flushedSSTables)
++                        if (sstable != null)
 +                        {
-                             if (sstable != null)
-                             {
-                                 sstables.add(sstable);
-                                 long size = sstable.bytesOnDisk();
-                                 totalBytesOnDisk += size;
-                                 maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
-                                 minBytesOnDisk = Math.min(minBytesOnDisk, size);
-                             }
++                            sstables.add(sstable);
++                            long size = sstable.bytesOnDisk();
++                            totalBytesOnDisk += size;
++                            maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
++                            minBytesOnDisk = Math.min(minBytesOnDisk, size);
 +                        }
 +                    }
 +                }
-                 memtable.cfs.replaceFlushed(memtable, sstables);
-                 reclaim(memtable);
-                 logger.debug("Flushed to {} ({} sstables, {} bytes), biggest {} bytes, smallest
{} bytes", sstables, sstables.size(), totalBytesOnDisk, maxBytesOnDisk, minBytesOnDisk);
 +            }
-             // signal the post-flush we've done our work
-             postFlush.latch.countDown();
++            memtable.cfs.replaceFlushed(memtable, sstables);
++            reclaim(memtable);
++            logger.debug("Flushed to {} ({} sstables, {} bytes), biggest {} bytes, smallest
{} bytes", sstables, sstables.size(), totalBytesOnDisk, maxBytesOnDisk, minBytesOnDisk);
 +        }
 +
          private void reclaim(final Memtable memtable)
          {
              // issue a read barrier for reclaiming the memory, and offload the wait to another
thread

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 73c6007,5f23baa..5bd2cf0
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -320,7 -306,7 +320,7 @@@ public class Directorie
       * which may return any non-blacklisted directory - even a data directory that has no
usable space.
       * Do not use this method in production code.
       *
--     * @throws IOError if all directories are blacklisted.
++     * @throws FSWriteError if all directories are blacklisted.
       */
      public File getDirectoryForNewSSTables()
      {
@@@ -330,11 -316,11 +330,14 @@@
      /**
       * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes
as usable space.
       *
--     * @throws IOError if all directories are blacklisted.
++     * @throws FSWriteError if all directories are blacklisted.
       */
      public File getWriteableLocationAsFile(long writeSize)
      {
--        return getLocationForDisk(getWriteableLocation(writeSize));
++        File location = getLocationForDisk(getWriteableLocation(writeSize));
++        if (location == null)
++            throw new FSWriteError(new IOException("No configured data directory contains
enough space to write " + writeSize + " bytes"), "");
++        return location;
      }
  
      /**
@@@ -366,9 -352,9 +369,10 @@@
      }
  
      /**
--     * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes
as usable space.
++     * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes
as usable space, null if
++     * there is not enough space left in all directories.
       *
--     * @throws IOError if all directories are blacklisted.
++     * @throws FSWriteError if all directories are blacklisted.
       */
      public DataDirectory getWriteableLocation(long writeSize)
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 244c7b6,5d5f7bf..bc054b4
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -24,6 -25,6 +24,7 @@@ import java.util.concurrent.atomic.Atom
  import java.util.concurrent.atomic.AtomicReference;
  
  import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Throwables;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -44,10 -44,11 +45,11 @@@ import org.apache.cassandra.db.rows.Unf
  import org.apache.cassandra.dht.*;
  import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
  import org.apache.cassandra.index.transactions.UpdateTransaction;
++import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 -import org.apache.cassandra.io.util.DiskAwareRunnable;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
@@@ -255,32 -254,9 +257,47 @@@ public class Memtable implements Compar
          return partitions.size();
      }
  
 -    public FlushRunnable flushRunnable()
 +    public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
      {
 -        return new FlushRunnable(lastReplayPosition.get());
 +        List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
 +
 +        if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
 +            return Collections.singletonList(new FlushRunnable(lastReplayPosition.get(),
txn));
 +
 +        return createFlushRunnables(localRanges, txn);
 +    }
 +
 +    private List<FlushRunnable> createFlushRunnables(List<Range<Token>>
localRanges, LifecycleTransaction txn)
 +    {
 +        assert cfs.getPartitioner().splitter().isPresent();
 +
 +        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
 +        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges,
cfs.getPartitioner(), locations);
 +        List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
 +        PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
 +        ReplayPosition context = lastReplayPosition.get();
-         for (int i = 0; i < boundaries.size(); i++)
++        try
 +        {
-             PartitionPosition t = boundaries.get(i);
-             runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn));
-             rangeStart = t;
++            for (int i = 0; i < boundaries.size(); i++)
++            {
++                PartitionPosition t = boundaries.get(i);
++                runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn));
++                rangeStart = t;
++            }
++            return runnables;
++        }
++        catch (Throwable e)
++        {
++            throw Throwables.propagate(abortRunnables(runnables, e));
 +        }
-         return runnables;
++    }
++
++    public Throwable abortRunnables(List<FlushRunnable> runnables, Throwable t)
++    {
++        if (runnables != null)
++            for (FlushRunnable runnable : runnables)
++                t = runnable.writer.abort(t);
++        return t;
      }
  
      public String toString()
@@@ -387,12 -345,21 +404,12 @@@
                                      * 1.2); // bloom filter and row index overhead
  
              this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 -        }
  
 -        public long getExpectedWriteSize()
 -        {
 -            return estimatedSize;
 -        }
 +            if (flushLocation == null)
-                 writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(getDirectories().getWriteableLocation(estimatedSize))),
columnsCollector.get(), statsCollector.get());
++                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)),
columnsCollector.get(), statsCollector.get());
 +            else
 +                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)),
columnsCollector.get(), statsCollector.get());
  
 -        protected void runMayThrow() throws Exception
 -        {
 -            long writeSize = getExpectedWriteSize();
 -            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
 -            File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
 -            assert sstableDirectory != null : "Flush task is not bound to any disk";
 -            Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
 -            cfs.replaceFlushed(Memtable.this, sstables);
          }
  
          protected Directories getDirectories()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index a083218,1a15d6f..0000000
deleted file mode 100644,100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ /dev/null
@@@ -1,54 -1,42 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.cassandra.io.util;
 -
 -import java.io.IOException;
--
--import org.apache.cassandra.db.Directories;
 -import org.apache.cassandra.io.FSWriteError;
--import org.apache.cassandra.utils.WrappedRunnable;
--
--public abstract class DiskAwareRunnable extends WrappedRunnable
--{
--    protected Directories.DataDirectory getWriteDirectory(long writeSize)
--    {
-         Directories.DataDirectory directory;
-         directory = getDirectory();
- 
-         if (directory == null) // ok panic - write anywhere
-             directory = getDirectories().getWriteableLocation(writeSize);
- 
 -        Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize);
--        if (directory == null)
-             throw new RuntimeException("Insufficient disk space to write " + writeSize +
" bytes");
 -            throw new FSWriteError(new IOException("Insufficient disk space to write " +
writeSize + " bytes"), "");
--
--        return directory;
--    }
--
--    /**
--     * Get sstable directories for the CF.
--     * @return Directories instance for the CF.
--     */
--    protected abstract Directories getDirectories();
-     protected abstract Directories.DataDirectory getDirectory();
- 
-     /**
-      * Called if no disk is available with free space for the full write size.
-      * @return true if the scope of the task was successfully reduced.
-      */
-     public boolean reduceScopeForLimitedSpace()
-     {
-         return false;
-     }
--}


Mime
View raw message