cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [1/3] git commit: Add SSTableSplitter tool to split sstables offline
Date Thu, 22 Aug 2013 08:18:45 GMT
Updated Branches:
  refs/heads/cassandra-2.0 da55d76d2 -> 05929c05c


Add SSTableSplitter tool to split sstables offline

patch by slebresne; reviewed by krummas for CASSANDRA-4766


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39066b72
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39066b72
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39066b72

Branch: refs/heads/cassandra-2.0
Commit: 39066b722607fa88e75d5bc772bd52f1ec8914a0
Parents: 9bb4d93
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Sun Nov 11 14:54:43 2012 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Aug 22 10:08:10 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +-
 bin/sstablesplit                                |  50 ++++
 debian/cassandra.install                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java |  14 +-
 .../db/compaction/SSTableSplitter.java          | 105 ++++++++
 .../cassandra/tools/StandaloneSplitter.java     | 256 +++++++++++++++++++
 7 files changed, 427 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1c963c..e887c27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,7 @@
  * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903)
  * Properly handle parsing huge map and set literals (CASSANDRA-5893)
  * Fix LCS L0 compaction may overlap in L1 (CASSANDRA-5907)
+ * New sstablesplit tool to split large sstables offline (CASSANDRA-4766)
 Merged from 1.1:
  * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 11281ee..491a438 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,9 +18,10 @@ using the provided 'sstableupgrade' tool.
 
 Features
 --------
-    - A history of executed nodetool commands is now captured. 
+    - A history of executed nodetool commands is now captured.
       It can be found in ~/.cassandra/nodetool.history. Other tools output files
       (cli and cqlsh history, .cqlshrc) are now centralized in ~/.cassandra, as well.
+    - A new sstablesplit utility allows to split large sstables offline.
 
 Defaults
 --------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/bin/sstablesplit
----------------------------------------------------------------------
diff --git a/bin/sstablesplit b/bin/sstablesplit
new file mode 100755
index 0000000..933a67d
--- /dev/null
+++ b/bin/sstablesplit
@@ -0,0 +1,50 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   `dirname $0`/cassandra.in.sh; do
+        if [ -r $include ]; then
+            . $include
+            break
+        fi
+    done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+    . $CASSANDRA_INCLUDE
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ -z $CLASSPATH ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+$JAVA -ea -cp $CLASSPATH -Xmx256M \
+        -Dlog4j.configuration=log4j-tools.properties \
+        org.apache.cassandra.tools.StandaloneSplitter "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/debian/cassandra.install
----------------------------------------------------------------------
diff --git a/debian/cassandra.install b/debian/cassandra.install
index a504b78..70d4b97 100644
--- a/debian/cassandra.install
+++ b/debian/cassandra.install
@@ -18,6 +18,7 @@ bin/sstableloader usr/bin
 bin/cqlsh usr/bin
 bin/sstablescrub usr/bin
 bin/sstableupgrade usr/bin
+bin/sstablesplit usr/bin
 bin/cassandra-shuffle usr/bin
 tools/bin/cassandra-stress usr/bin
 tools/bin/token-generator usr/bin

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a6b9f89..0fed0a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -98,7 +98,7 @@ public class CompactionTask extends AbstractCompactionTask
         for (SSTableReader sstable : toCompact)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
-        CompactionController controller = new CompactionController(cfs, toCompact, gcBefore);
+        CompactionController controller = getCompactionController(toCompact);
         // new sstables from flush can be added during a compaction, but only the compaction
can remove them,
         // so in our single-threaded compaction world this is a valid way of determining
if we're compacting
         // all the sstables (that existed when we started)
@@ -227,7 +227,7 @@ public class CompactionTask extends AbstractCompactionTask
                 collector.finishCompaction(ci);
         }
 
-        cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
+        replaceCompactedSSTables(toCompact, sstables);
         // TODO: this doesn't belong here, it should be part of the reader to load when the
tracker is wired up
         for (SSTableReader sstable : sstables)
         {
@@ -265,6 +265,16 @@ public class CompactionTask extends AbstractCompactionTask
         }
     }
 
+    protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader>
replacements)
+    {
+        cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
+    }
+
+    protected CompactionController getCompactionController(Collection<SSTableReader>
toCompact)
+    {
+        return new CompactionController(cfs, toCompact, gcBefore);
+    }
+
     protected boolean partialCompactionsAcceptable()
     {
         return !isUserDefined;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
new file mode 100644
index 0000000..214c7a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+
+public class SSTableSplitter {
+
+    private final SplittingCompactionTask task;
+
+    private CompactionInfo.Holder info;
+
+    public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+    {
+        this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB);
+    }
+
+    public void split() throws IOException
+    {
+        task.execute(new StatsCollector());
+    }
+
+    public class StatsCollector implements CompactionManager.CompactionExecutorStatsCollector
+    {
+        public void beginCompaction(CompactionInfo.Holder ci)
+        {
+            SSTableSplitter.this.info = ci;
+        }
+
+        public void finishCompaction(CompactionInfo.Holder ci)
+        {
+            // no-op
+        }
+    }
+
+    public static class SplittingCompactionTask extends CompactionTask
+    {
+        private final int sstableSizeInMB;
+
+        public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int
sstableSizeInMB)
+        {
+            super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+            this.sstableSizeInMB = sstableSizeInMB;
+
+            if (sstableSizeInMB <= 0)
+                throw new IllegalArgumentException("Invalid target size for SSTables, must
be > 0 (got: " + sstableSizeInMB + ")");
+        }
+
+        @Override
+        protected CompactionController getCompactionController(Collection<SSTableReader>
toCompact)
+        {
+            return new SplitController(cfs, toCompact);
+        }
+
+        @Override
+        protected void replaceCompactedSSTables(Collection<SSTableReader> compacted,
Collection<SSTableReader> replacements)
+        {
+        }
+
+        @Override
+        protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
+        {
+            return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
+        }
+
+        @Override
+        protected boolean partialCompactionsAcceptable()
+        {
+            return true;
+        }
+    }
+
+    public static class SplitController extends CompactionController
+    {
+        public SplitController(ColumnFamilyStore cfs, Collection<SSTableReader> toCompact)
+        {
+            super(cfs, CompactionManager.NO_GC);
+        }
+
+        @Override
+        public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
new file mode 100644
index 0000000..1ce94be
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.db.compaction.SSTableSplitter;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+
+public class StandaloneSplitter
+{
+    public static final int DEFAULT_SSTABLE_SIZE = 50;
+
+    static
+    {
+        CassandraDaemon.initLog4j();
+    }
+
+    private static final String TOOL_NAME = "sstablessplit";
+    private static final String VERBOSE_OPTION = "verbose";
+    private static final String DEBUG_OPTION = "debug";
+    private static final String HELP_OPTION = "help";
+    private static final String NO_SNAPSHOT_OPTION = "no-snapshot";
+    private static final String SIZE_OPTION = "size";
+
+    public static void main(String args[]) throws IOException
+    {
+        Options options = Options.parseArgs(args);
+        try
+        {
+            // load keyspace descriptions.
+            DatabaseDescriptor.loadSchemas();
+
+            String ksName = null;
+            String cfName = null;
+            Map<Descriptor, Set<Component>> parsedFilenames = new HashMap<Descriptor,
Set<Component>>();
+            for (String filename : options.filenames)
+            {
+                File file = new File(filename);
+                if (!file.exists()) {
+                    System.out.println("Skipping inexisting file " + file);
+                    continue;
+                }
+
+                Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(),
file.getName());
+                if (pair == null) {
+                    System.out.println("Skipping non sstable file " + file);
+                    continue;
+                }
+                Descriptor desc = pair.left;
+
+                if (ksName == null)
+                    ksName = desc.ksname;
+                else if (!ksName.equals(desc.ksname))
+                    throw new IllegalArgumentException("All sstables must be part of the
same keyspace");
+
+                if (cfName == null)
+                    cfName = desc.cfname;
+                else if (!cfName.equals(desc.cfname))
+                    throw new IllegalArgumentException("All sstables must be part of the
same column family");
+
+                Set<Component> components = new HashSet<Component>(Arrays.asList(new
Component[]{
+                    Component.DATA,
+                    Component.PRIMARY_INDEX,
+                    Component.FILTER,
+                    Component.COMPRESSION_INFO,
+                    Component.STATS
+                }));
+
+                Iterator<Component> iter = components.iterator();
+                while (iter.hasNext()) {
+                    Component component = iter.next();
+                    if (!(new File(desc.filenameFor(component)).exists()))
+                        iter.remove();
+                }
+                parsedFilenames.put(desc, components);
+            }
+
+            if (ksName == null || cfName == null)
+            {
+                System.err.println("No valid sstables to split");
+                System.exit(1);
+            }
+
+            // Do not load sstables since they might be broken
+            Table table = Table.openWithoutSSTables(ksName);
+            ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+            String snapshotName = "pre-split-" + System.currentTimeMillis();
+
+            List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+            for (Map.Entry<Descriptor, Set<Component>> fn : parsedFilenames.entrySet())
+            {
+                try
+                {
+                    SSTableReader sstable = SSTableReader.openNoValidation(fn.getKey(), fn.getValue(),
cfs.metadata);
+                    sstables.add(sstable);
+
+                    if (options.snapshot) {
+                        File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor,
snapshotName);
+                        sstable.createLinks(snapshotDirectory.getPath());
+                    }
+
+                }
+                catch (Exception e)
+                {
+                    System.err.println(String.format("Error Loading %s: %s", fn.getKey(),
e.getMessage()));
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+            }
+            if (options.snapshot)
+                System.out.println(String.format("Pre-split sstables snapshotted into snapshot
%s", snapshotName));
+
+            cfs.getDataTracker().markCompacting(sstables);
+            for (SSTableReader sstable : sstables)
+            {
+                try
+                {
+                    new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
+
+                    // Remove the sstable
+                    sstable.markCompacted();
+                    sstable.releaseReference();
+                }
+                catch (Exception e)
+                {
+                    System.err.println(String.format("Error splitting %s: %s", sstable, e.getMessage()));
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+            }
+            SSTableDeletingTask.waitForDeletions();
+            System.exit(0); // We need that to stop non daemonized threads
+        }
+        catch (Exception e)
+        {
+            System.err.println(e.getMessage());
+            if (options.debug)
+                e.printStackTrace(System.err);
+            System.exit(1);
+        }
+    }
+
+    private static class Options
+    {
+        public final List<String> filenames;
+
+        public boolean debug;
+        public boolean verbose;
+        public boolean snapshot;
+        public int sizeInMB;
+
+        private Options(List<String> filenames)
+        {
+            this.filenames = filenames;
+        }
+
+        public static Options parseArgs(String cmdArgs[])
+        {
+            CommandLineParser parser = new GnuParser();
+            CmdLineOptions options = getCmdLineOptions();
+            try
+            {
+                CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+                if (cmd.hasOption(HELP_OPTION))
+                {
+                    printUsage(options);
+                    System.exit(0);
+                }
+
+                String[] args = cmd.getArgs();
+                if (args.length == 0)
+                {
+                    System.err.println("No sstables to split");
+                    printUsage(options);
+                    System.exit(1);
+                }
+                Options opts = new Options(Arrays.asList(args));
+                opts.debug = cmd.hasOption(DEBUG_OPTION);
+                opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+                opts.snapshot = !cmd.hasOption(NO_SNAPSHOT_OPTION);
+
+                if (cmd.hasOption(SIZE_OPTION))
+                    opts.sizeInMB = Integer.valueOf(cmd.getOptionValue(SIZE_OPTION));
+
+                return opts;
+            }
+            catch (ParseException e)
+            {
+                errorMsg(e.getMessage(), options);
+                return null;
+            }
+        }
+
+        private static void errorMsg(String msg, CmdLineOptions options)
+        {
+            System.err.println(msg);
+            printUsage(options);
+            System.exit(1);
+        }
+
+        private static CmdLineOptions getCmdLineOptions()
+        {
+            CmdLineOptions options = new CmdLineOptions();
+            options.addOption(null, DEBUG_OPTION,          "display stack traces");
+            options.addOption("v",  VERBOSE_OPTION,        "verbose output");
+            options.addOption("h",  HELP_OPTION,           "display this help message");
+            options.addOption(null, NO_SNAPSHOT_OPTION,    "don't snapshot the sstables before
splitting");
+            options.addOption("s",  SIZE_OPTION, "size",   "maximum size in MB for the output
sstables (default: " + DEFAULT_SSTABLE_SIZE + ")");
+            return options;
+        }
+
+        public static void printUsage(CmdLineOptions options)
+        {
+            String usage = String.format("%s [options] <filename> [<filename>]*",
TOOL_NAME);
+            StringBuilder header = new StringBuilder();
+            header.append("--\n");
+            header.append("Split the provided sstables files in sstables of maximum provided
file size (see option --" + SIZE_OPTION + ")." );
+            header.append("\n--\n");
+            header.append("Options are:");
+            new HelpFormatter().printHelp(usage, header.toString(), options, "");
+        }
+    }
+}


Mime
View raw message