Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 98682 invoked from network); 8 Feb 2011 20:37:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Feb 2011 20:37:01 -0000 Received: (qmail 24432 invoked by uid 500); 8 Feb 2011 20:37:01 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 24412 invoked by uid 500); 8 Feb 2011 20:37:01 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 24403 invoked by uid 99); 8 Feb 2011 20:37:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Feb 2011 20:37:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Feb 2011 20:36:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E4C3E23889D7; Tue, 8 Feb 2011 20:36:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1068562 - /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java Date: Tue, 08 Feb 2011 20:36:35 -0000 To: commits@cassandra.apache.org From: eevans@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110208203635.E4C3E23889D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: eevans Date: Tue Feb 8 20:36:35 2011 New Revision: 1068562 URL: http://svn.apache.org/viewvc?rev=1068562&view=rev Log: paging of large rows in sstable2json Patch by Pavel Yaskevich; reviewed by eevans for CASSANDRA-2041 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1068562&r1=1068561&r2=1068562&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java Tue Feb 8 20:36:35 2011 @@ -24,15 +24,17 @@ import java.io.PrintStream; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.IColumnIterator; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.io.util.BufferedRandomAccessFile; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.commons.cli.*; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.ExpiringColumn; -import org.apache.cassandra.db.IColumn; -import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.*; @@ -44,7 +46,8 @@ import static org.apache.cassandra.utils */ public class SSTableExport { - private static int INPUT_FILE_BUFFER_SIZE = 8 * 1024 * 1024; + // size of the columns page + private static final int PAGE_SIZE = 1000; private static final String KEY_OPTION = "k"; private static final String EXCLUDEKEY_OPTION = "x"; @@ -69,79 +72,168 @@ public class SSTableExport Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only"); options.addOption(optEnumerate); } - + + /** + * Wraps given string into quotes + * @param val string to quote + * @return quoted string + */ private static String quote(String val) { return String.format("\"%s\"", val); } - + + /** + * JSON Hash Key serializer + * @param val value to set as a key + * @return JSON Hash key + */ private static String asKey(String val) { return String.format("%s: ", quote(val)); } - - private static void serializeColumns(PrintStream outs, Collection cols, AbstractType comp) + + /** + * Serialize columns using given column iterator + * @param columns column iterator + * @param out output stream + */ + private static void serializeColumns(Iterator columns, PrintStream out) { - outs.print("["); + while (columns.hasNext()) + { + serializeColumn(columns.next(), out); - Iterator iter = cols.iterator(); - while (iter.hasNext()) + if (columns.hasNext()) + out.print(", "); + } + } + + /** + * Serialize a collection of the columns + * @param columns collection of the columns to serialize + * @param out output stream + */ + private static void serializeColumns(Collection columns, PrintStream out) + { + serializeColumns(columns.iterator(), out); + } + + /** + * Serialize a given column to the JSON format + * @param column column presentation + * @param out output stream + */ + private static void serializeColumn(IColumn column, PrintStream out) + { + out.print("["); + out.print(quote(bytesToHex(column.name()))); + out.print(", "); + out.print(quote(bytesToHex(column.value()))); + out.print(", "); + out.print(column.timestamp()); + out.print(", "); + out.print(column.isMarkedForDelete()); + + if (column instanceof ExpiringColumn) + { + out.print(", "); + out.print(((ExpiringColumn) column).getTimeToLive()); + out.print(", "); + out.print(column.getLocalDeletionTime()); + } + + out.print("]"); + } + + /** + * Get portion of the columns and serialize in loop while not more columns left in the row + * @param reader SSTableReader for given SSTable + * @param row SSTableIdentityIterator row representation with Column Family + * @param key Decorated Key for the required row + * @param out output stream + */ + private static void serializeRow(SSTableReader reader, SSTableIdentityIterator row, DecoratedKey key, PrintStream out) + { + ColumnFamily columnFamily = row.getColumnFamily(); + boolean isSuperCF = columnFamily.isSuper(); + ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; // initial column name, "blank" for first + + out.print(asKey(bytesToHex(key.key))); + + out.print(isSuperCF ? "{" : "["); + + while (true) { - outs.print("["); - IColumn column = iter.next(); - outs.print(quote(bytesToHex(column.name()))); - outs.print(", "); - outs.print(quote(bytesToHex(column.value()))); - outs.print(", "); - outs.print(column.timestamp()); - outs.print(", "); - outs.print(column.isMarkedForDelete()); - if (column instanceof ExpiringColumn) + QueryFilter filter = QueryFilter.getSliceFilter(key, + new QueryPath(columnFamily.metadata().tableName), + startColumn, + ByteBufferUtil.EMPTY_BYTE_BUFFER, + false, + PAGE_SIZE); + + IColumnIterator columns = filter.getSSTableColumnIterator(reader); + + int columnCount = 0; + while (columns.hasNext()) { - outs.print(", "); - outs.print(((ExpiringColumn) column).getTimeToLive()); - outs.print(", "); - outs.print(column.getLocalDeletionTime()); + // setting new start column to the last of the current columns + startColumn = columns.next().name(); + columnCount++; } - outs.print("]"); - if (iter.hasNext()) - outs.print(", "); + + try + { + columns = filter.getSSTableColumnIterator(reader); // iterator reset + serializeRow(columns, isSuperCF, out); + } + catch (IOException e) + { + System.err.println("WARNING: Corrupt row " + key + " (skipping)."); + } + + if (columnCount < PAGE_SIZE) + break; } - - outs.print("]"); + + out.print(isSuperCF ? "}" : "]"); } - - private static void serializeRow(PrintStream outs, SSTableIdentityIterator row) throws IOException - { - ColumnFamily cf = row.getColumnFamilyWithColumns(); - AbstractType comparator = cf.getComparator(); - outs.print(asKey(bytesToHex(row.getKey().key))); - if (cf.isSuper()) + /** + * Serialize a row with already given column iterator + * + * @param columns columns of the row + * @param isSuper true if wrapping Column Family is Super + * @param out output stream + * + * @throws IOException on any I/O error. + */ + private static void serializeRow(IColumnIterator columns, boolean isSuper, PrintStream out) throws IOException + { + if (isSuper) { - outs.print("{ "); - - Iterator iter = cf.getSortedColumns().iterator(); - while (iter.hasNext()) + while (columns.hasNext()) { - IColumn column = iter.next(); - outs.print(asKey(bytesToHex(column.name()))); - outs.print("{"); - outs.print(asKey("deletedAt")); - outs.print(column.getMarkedForDeleteAt()); - outs.print(", "); - outs.print(asKey("subColumns")); - serializeColumns(outs, column.getSubColumns(), comparator); - outs.print("}"); - if (iter.hasNext()) - outs.print(", "); + IColumn column = columns.next(); + + out.print(asKey(bytesToHex(column.name()))); + out.print("{"); + out.print(asKey("deletedAt")); + out.print(column.getMarkedForDeleteAt()); + out.print(", "); + out.print(asKey("subColumns")); + out.print("["); + serializeColumns(column.getSubColumns(), out); + out.print("]"); + out.print("}"); + + if (columns.hasNext()) + out.print(", "); } - - outs.print("}"); } else { - serializeColumns(outs, cf.getSortedColumns(), comparator); + serializeColumns(columns, out); } } @@ -176,117 +268,97 @@ public class SSTableExport /** * Export specific rows from an SSTable and write the resulting JSON to a PrintStream. * - * @param ssTableFile the SSTable to export the rows from + * @param ssTableFile the SSTableScanner to export the rows from * @param outs PrintStream to write the output to - * @param keys the keys corresponding to the rows to export + * @param toExport the keys corresponding to the rows to export + * @param excludes keys to exclude from export * @throws IOException on failure to read/write input/output */ - public static void export(String ssTableFile, PrintStream outs, String[] keys, String[] excludes) - throws IOException + public static void export(String ssTableFile, PrintStream outs, Collection toExport, String[] excludes) throws IOException { SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile)); - SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE); - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); - Set excludeSet = new HashSet(); - int i = 0; + SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE); + + IPartitioner partitioner = StorageService.getPartitioner(); + + for (String toExclude : excludes) + { + toExport.remove(toExclude); // excluding key from export + } - if (excludes != null) - excludeSet = new HashSet(Arrays.asList(excludes)); - outs.println("{"); - // last key to compare order + int i = 0; + + // last key to compare order DecoratedKey lastKey = null; - - for (String key : keys) + + for (String key : toExport) { - if (excludeSet.contains(key)) - continue; - DecoratedKey dk = partitioner.decorateKey(hexToBytes(key)); + DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key)); - // validate order of the keys in the sstable - if (lastKey != null && lastKey.compareTo(dk) > 0 ) - throw new IOException("Key out of order! " + lastKey + " > " + dk); - lastKey = dk; + if (lastKey != null && lastKey.compareTo(decoratedKey) > 0) + throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey); - scanner.seekTo(dk); - - i++; + lastKey = decoratedKey; - if (scanner.hasNext()) - { - SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + scanner.seekTo(decoratedKey); - try - { - serializeRow(outs, row); - } - catch (IOException ioexc) - { - System.err.println("WARNING: Corrupt row " + key + " (skipping)."); - continue; - } - catch (OutOfMemoryError oom) - { - System.err.println("ERROR: Out of memory deserializing row " + key); - continue; - } + if (!scanner.hasNext()) + continue; - if (i != 1) - outs.println(","); - } + serializeRow(reader, (SSTableIdentityIterator) scanner.next(), decoratedKey, outs); + + if (i != 0) + outs.println(","); + + i++; } - + outs.println("\n}"); outs.flush(); + + scanner.close(); } // This is necessary to accommodate the test suite since you cannot open a Reader more // than once from within the same process. static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException { - SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE); Set excludeSet = new HashSet(); if (excludes != null) excludeSet = new HashSet(Arrays.asList(excludes)); - outs.println("{"); SSTableIdentityIterator row; + SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE); - boolean elementWritten = false; + outs.println("{"); + + int i = 0; + + // collecting keys to export while (scanner.hasNext()) { row = (SSTableIdentityIterator) scanner.next(); - if (excludeSet.contains(bytesToHex(row.getKey().key))) + String currentKey = bytesToHex(row.getKey().key); + + if (excludeSet.contains(currentKey)) continue; - else if (elementWritten) + else if (i != 0) outs.println(","); - try - { - serializeRow(outs, row); + serializeRow(reader, row, row.getKey(), outs); - // used to decide should we put ',' after previous row or not - if (!elementWritten) - elementWritten = true; - } - catch (IOException ioexcep) - { - System.err.println("WARNING: Corrupt row " + bytesToHex(row.getKey().key) + " (skipping)."); - elementWritten = false; - } - catch (OutOfMemoryError oom) - { - System.err.println("ERROR: Out of memory deserializing row " + bytesToHex(row.getKey().key)); - elementWritten = false; - } + i++; } - - outs.printf("%n}%n"); + + outs.println("\n}"); outs.flush(); + + scanner.close(); } /** @@ -294,18 +366,21 @@ public class SSTableExport * * @param ssTableFile the SSTable to export * @param outs PrintStream to write the output to + * @param excludes keys to exclude from export + * * @throws IOException on failure to read/write input/output */ public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException { - SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile)); - export(reader, outs, excludes); + export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes); } /** * Export an SSTable and write the resulting JSON to standard out. * * @param ssTableFile SSTable to export + * @param excludes keys to exclude from export + * * @throws IOException on failure to read/write SSTable/standard out */ public static void export(String ssTableFile, String[] excludes) throws IOException @@ -318,7 +393,9 @@ public class SSTableExport * export the contents of the SSTable to JSON. * * @param args command lines arguments + * * @throws IOException on failure to open/read/write files or output streams + * @throws ConfigurationException on configuration failure (wrong params given) */ public static void main(String[] args) throws IOException, ConfigurationException { @@ -364,7 +441,7 @@ public class SSTableExport else { if ((keys != null) && (keys.length > 0)) - export(ssTableFileName, System.out, keys, excludes); + export(ssTableFileName, System.out, Arrays.asList(keys), excludes); else export(ssTableFileName, excludes); }