cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/3] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0.0
Date Thu, 22 Aug 2013 08:18:46 GMT
Merge branch 'cassandra-1.2' into cassandra-2.0.0

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionTask.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/624a7a02
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/624a7a02
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/624a7a02

Branch: refs/heads/cassandra-2.0
Commit: 624a7a02f8e1240aabdac3117db64d8f3a5da99b
Parents: 5774ecb 39066b7
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Thu Aug 22 10:17:54 2013 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Aug 22 10:17:54 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   1 +
 bin/sstablesplit                                |  50 ++++
 debian/cassandra.install                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java |  14 +-
 .../db/compaction/SSTableSplitter.java          | 105 ++++++++
 .../cassandra/tools/StandaloneSplitter.java     | 256 +++++++++++++++++++
 7 files changed, 426 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 717dd4a,491a438..cd8c880
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -97,7 -21,12 +97,8 @@@ Feature
      - A history of executed nodetool commands is now captured.
        It can be found in ~/.cassandra/nodetool.history. Other tools output files
        (cli and cqlsh history, .cqlshrc) are now centralized in ~/.cassandra, as well.
+     - A new sstablesplit utility allows to split large sstables offline.
  
 -Defaults
 ---------
 -    - After performance testing for CASSANDRA-5727, the default LCS filesize
 -      has been changed from 5MB to 160MB.
  
  
  1.2.7

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/debian/cassandra.install
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a7b6c64,0fed0a2..c27a020
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -102,13 -96,9 +102,13 @@@ public class CompactionTask extends Abs
  
          // sanity check: all sstables must belong to the same cfs
          for (SSTableReader sstable : toCompact)
 -            assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 +            assert sstable.descriptor.cfname.equals(cfs.name);
 +
 +        UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
  
-         CompactionController controller = new CompactionController(cfs, toCompact, gcBefore);
+         CompactionController controller = getCompactionController(toCompact);
 +        Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
 +
          // new sstables from flush can be added during a compaction, but only the compaction
can remove them,
          // so in our single-threaded compaction world this is a valid way of determining
if we're compacting
          // all the sstables (that existed when we started)
@@@ -237,12 -222,18 +237,12 @@@
              {
                  throw new RuntimeException(e);
              }
 -
 -            if (collector != null)
 -                collector.finishCompaction(ci);
          }
  
-         cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
+         replaceCompactedSSTables(toCompact, sstables);
          // TODO: this doesn't belong here, it should be part of the reader to load when
the tracker is wired up
          for (SSTableReader sstable : sstables)
 -        {
 -            for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeyMap.get(sstable.descriptor).entrySet())
 -               sstable.cacheKey(entry.getKey(), entry.getValue());
 -        }
 +            sstable.preheat(cachedKeyMap.get(sstable.descriptor));
  
          if (logger.isInfoEnabled())
          {
@@@ -274,20 -265,16 +274,30 @@@
          }
      }
  
 +    private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
 +    {
 +        return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
 +                                 keysPerSSTable,
 +                                 cfs.metadata,
 +                                 cfs.partitioner,
 +                                 SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator,
getLevel()));
 +    }
 +
 +    protected int getLevel()
 +    {
 +        return 0;
 +    }
 +
+     protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader>
replacements)
+     {
+         cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
+     }
+ 
 -    protected CompactionController getCompactionController(Collection<SSTableReader>
toCompact)
++    protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
+     {
+         return new CompactionController(cfs, toCompact, gcBefore);
+     }
+ 
      protected boolean partialCompactionsAcceptable()
      {
          return !isUserDefined;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 0000000,214c7a1..a22b7ca
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@@ -1,0 -1,105 +1,105 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.io.IOException;
+ import java.util.*;
+ 
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.io.sstable.*;
+ 
+ public class SSTableSplitter {
+ 
+     private final SplittingCompactionTask task;
+ 
+     private CompactionInfo.Holder info;
+ 
+     public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+     {
+         this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB);
+     }
+ 
+     public void split() throws IOException
+     {
+         task.execute(new StatsCollector());
+     }
+ 
+     public class StatsCollector implements CompactionManager.CompactionExecutorStatsCollector
+     {
+         public void beginCompaction(CompactionInfo.Holder ci)
+         {
+             SSTableSplitter.this.info = ci;
+         }
+ 
+         public void finishCompaction(CompactionInfo.Holder ci)
+         {
+             // no-op
+         }
+     }
+ 
+     public static class SplittingCompactionTask extends CompactionTask
+     {
+         private final int sstableSizeInMB;
+ 
+         public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int
sstableSizeInMB)
+         {
+             super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+             this.sstableSizeInMB = sstableSizeInMB;
+ 
+             if (sstableSizeInMB <= 0)
+                 throw new IllegalArgumentException("Invalid target size for SSTables, must
be > 0 (got: " + sstableSizeInMB + ")");
+         }
+ 
+         @Override
 -        protected CompactionController getCompactionController(Collection<SSTableReader>
toCompact)
++        protected CompactionController getCompactionController(Set<SSTableReader>
toCompact)
+         {
+             return new SplitController(cfs, toCompact);
+         }
+ 
+         @Override
+         protected void replaceCompactedSSTables(Collection<SSTableReader> compacted,
Collection<SSTableReader> replacements)
+         {
+         }
+ 
+         @Override
+         protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
+         {
+             return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
+         }
+ 
+         @Override
+         protected boolean partialCompactionsAcceptable()
+         {
+             return true;
+         }
+     }
+ 
+     public static class SplitController extends CompactionController
+     {
+         public SplitController(ColumnFamilyStore cfs, Collection<SSTableReader> toCompact)
+         {
+             super(cfs, CompactionManager.NO_GC);
+         }
+ 
+         @Override
+         public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+         {
+             return false;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 0000000,1ce94be..245d824
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@@ -1,0 -1,256 +1,256 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.cassandra.tools;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.*;
+ 
+ import org.apache.commons.cli.*;
+ 
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.Schema;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories;
 -import org.apache.cassandra.db.Table;
++import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.LeveledManifest;
+ import org.apache.cassandra.db.compaction.SSTableSplitter;
+ import org.apache.cassandra.io.sstable.*;
+ import org.apache.cassandra.service.CassandraDaemon;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+ 
+ public class StandaloneSplitter
+ {
+     public static final int DEFAULT_SSTABLE_SIZE = 50;
+ 
+     static
+     {
+         CassandraDaemon.initLog4j();
+     }
+ 
+     private static final String TOOL_NAME = "sstablessplit";
+     private static final String VERBOSE_OPTION = "verbose";
+     private static final String DEBUG_OPTION = "debug";
+     private static final String HELP_OPTION = "help";
+     private static final String NO_SNAPSHOT_OPTION = "no-snapshot";
+     private static final String SIZE_OPTION = "size";
+ 
+     public static void main(String args[]) throws IOException
+     {
+         Options options = Options.parseArgs(args);
+         try
+         {
+             // load keyspace descriptions.
+             DatabaseDescriptor.loadSchemas();
+ 
+             String ksName = null;
+             String cfName = null;
+             Map<Descriptor, Set<Component>> parsedFilenames = new HashMap<Descriptor,
Set<Component>>();
+             for (String filename : options.filenames)
+             {
+                 File file = new File(filename);
+                 if (!file.exists()) {
+                     System.out.println("Skipping inexisting file " + file);
+                     continue;
+                 }
+ 
+                 Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(),
file.getName());
+                 if (pair == null) {
+                     System.out.println("Skipping non sstable file " + file);
+                     continue;
+                 }
+                 Descriptor desc = pair.left;
+ 
+                 if (ksName == null)
+                     ksName = desc.ksname;
+                 else if (!ksName.equals(desc.ksname))
+                     throw new IllegalArgumentException("All sstables must be part of the
same keyspace");
+ 
+                 if (cfName == null)
+                     cfName = desc.cfname;
+                 else if (!cfName.equals(desc.cfname))
+                     throw new IllegalArgumentException("All sstables must be part of the
same column family");
+ 
+                 Set<Component> components = new HashSet<Component>(Arrays.asList(new
Component[]{
+                     Component.DATA,
+                     Component.PRIMARY_INDEX,
+                     Component.FILTER,
+                     Component.COMPRESSION_INFO,
+                     Component.STATS
+                 }));
+ 
+                 Iterator<Component> iter = components.iterator();
+                 while (iter.hasNext()) {
+                     Component component = iter.next();
+                     if (!(new File(desc.filenameFor(component)).exists()))
+                         iter.remove();
+                 }
+                 parsedFilenames.put(desc, components);
+             }
+ 
+             if (ksName == null || cfName == null)
+             {
+                 System.err.println("No valid sstables to split");
+                 System.exit(1);
+             }
+ 
+             // Do not load sstables since they might be broken
 -            Table table = Table.openWithoutSSTables(ksName);
 -            ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
++            Keyspace keyspace = Keyspace.openWithoutSSTables(ksName);
++            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+ 
+             String snapshotName = "pre-split-" + System.currentTimeMillis();
+ 
+             List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+             for (Map.Entry<Descriptor, Set<Component>> fn : parsedFilenames.entrySet())
+             {
+                 try
+                 {
+                     SSTableReader sstable = SSTableReader.openNoValidation(fn.getKey(),
fn.getValue(), cfs.metadata);
+                     sstables.add(sstable);
+ 
+                     if (options.snapshot) {
+                         File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor,
snapshotName);
+                         sstable.createLinks(snapshotDirectory.getPath());
+                     }
+ 
+                 }
+                 catch (Exception e)
+                 {
+                     System.err.println(String.format("Error Loading %s: %s", fn.getKey(),
e.getMessage()));
+                     if (options.debug)
+                         e.printStackTrace(System.err);
+                 }
+             }
+             if (options.snapshot)
+                 System.out.println(String.format("Pre-split sstables snapshotted into snapshot
%s", snapshotName));
+ 
+             cfs.getDataTracker().markCompacting(sstables);
+             for (SSTableReader sstable : sstables)
+             {
+                 try
+                 {
+                     new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
+ 
+                     // Remove the sstable
 -                    sstable.markCompacted();
++                    sstable.markObsolete();
+                     sstable.releaseReference();
+                 }
+                 catch (Exception e)
+                 {
+                     System.err.println(String.format("Error splitting %s: %s", sstable,
e.getMessage()));
+                     if (options.debug)
+                         e.printStackTrace(System.err);
+                 }
+             }
+             SSTableDeletingTask.waitForDeletions();
+             System.exit(0); // We need that to stop non daemonized threads
+         }
+         catch (Exception e)
+         {
+             System.err.println(e.getMessage());
+             if (options.debug)
+                 e.printStackTrace(System.err);
+             System.exit(1);
+         }
+     }
+ 
+     private static class Options
+     {
+         public final List<String> filenames;
+ 
+         public boolean debug;
+         public boolean verbose;
+         public boolean snapshot;
+         public int sizeInMB;
+ 
+         private Options(List<String> filenames)
+         {
+             this.filenames = filenames;
+         }
+ 
+         public static Options parseArgs(String cmdArgs[])
+         {
+             CommandLineParser parser = new GnuParser();
+             CmdLineOptions options = getCmdLineOptions();
+             try
+             {
+                 CommandLine cmd = parser.parse(options, cmdArgs, false);
+ 
+                 if (cmd.hasOption(HELP_OPTION))
+                 {
+                     printUsage(options);
+                     System.exit(0);
+                 }
+ 
+                 String[] args = cmd.getArgs();
+                 if (args.length == 0)
+                 {
+                     System.err.println("No sstables to split");
+                     printUsage(options);
+                     System.exit(1);
+                 }
+                 Options opts = new Options(Arrays.asList(args));
+                 opts.debug = cmd.hasOption(DEBUG_OPTION);
+                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+                 opts.snapshot = !cmd.hasOption(NO_SNAPSHOT_OPTION);
+ 
+                 if (cmd.hasOption(SIZE_OPTION))
+                     opts.sizeInMB = Integer.valueOf(cmd.getOptionValue(SIZE_OPTION));
+ 
+                 return opts;
+             }
+             catch (ParseException e)
+             {
+                 errorMsg(e.getMessage(), options);
+                 return null;
+             }
+         }
+ 
+         private static void errorMsg(String msg, CmdLineOptions options)
+         {
+             System.err.println(msg);
+             printUsage(options);
+             System.exit(1);
+         }
+ 
+         private static CmdLineOptions getCmdLineOptions()
+         {
+             CmdLineOptions options = new CmdLineOptions();
+             options.addOption(null, DEBUG_OPTION,          "display stack traces");
+             options.addOption("v",  VERBOSE_OPTION,        "verbose output");
+             options.addOption("h",  HELP_OPTION,           "display this help message");
+             options.addOption(null, NO_SNAPSHOT_OPTION,    "don't snapshot the sstables
before splitting");
+             options.addOption("s",  SIZE_OPTION, "size",   "maximum size in MB for the output
sstables (default: " + DEFAULT_SSTABLE_SIZE + ")");
+             return options;
+         }
+ 
+         public static void printUsage(CmdLineOptions options)
+         {
+             String usage = String.format("%s [options] <filename> [<filename>]*",
TOOL_NAME);
+             StringBuilder header = new StringBuilder();
+             header.append("--\n");
+             header.append("Split the provided sstables files in sstables of maximum provided
file size (see option --" + SIZE_OPTION + ")." );
+             header.append("\n--\n");
+             header.append("Options are:");
+             new HelpFormatter().printHelp(usage, header.toString(), options, "");
+         }
+     }
+ }


Mime
View raw message