Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 28888 invoked from network); 10 Mar 2011 20:28:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 10 Mar 2011 20:28:43 -0000 Received: (qmail 26634 invoked by uid 500); 10 Mar 2011 20:28:43 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 26614 invoked by uid 500); 10 Mar 2011 20:28:43 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 26606 invoked by uid 99); 10 Mar 2011 20:28:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Mar 2011 20:28:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Mar 2011 20:28:41 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8C7A123889C5; Thu, 10 Mar 2011 20:28:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1080338 - in /cassandra/trunk: ./ conf/ contrib/ contrib/pig/ contrib/pig/bin/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassand... Date: Thu, 10 Mar 2011 20:28:18 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110310202818.8C7A123889C5@eris.apache.org> Author: jbellis Date: Thu Mar 10 20:28:17 2011 New Revision: 1080338 URL: http://svn.apache.org/viewvc?rev=1080338&view=rev Log: merge from 0.7 Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/build.xml cassandra/trunk/conf/cassandra.yaml cassandra/trunk/contrib/ (props changed) cassandra/trunk/contrib/pig/bin/pig_cassandra cassandra/trunk/contrib/pig/build.xml cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 10 20:28:17 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891 -/cassandra/branches/cassandra-0.7:1026516-1079936 +/cassandra/branches/cassandra-0.7:1026516-1080312 /cassandra/branches/cassandra-0.7.0:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3:774578-796573 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Mar 10 20:28:17 2011 @@ -27,6 +27,9 @@ * avoid writing empty rows when scrubbing tombstoned rows (CASSANDRA-2296) * fix assertion error in range and index scans for CL < ALL (CASSANDRA-2282) + * fix commitlog replay when flush position refers to data that didn't + get synced before server died (CASSANDRA-2285) + * fix fd leak in sstable2json with non-mmap'd i/o (CASSANDRA-2304) 0.7.3 Modified: cassandra/trunk/build.xml URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/build.xml (original) +++ cassandra/trunk/build.xml Thu Mar 10 20:28:17 2011 @@ -631,7 +631,16 @@ - + + + + + + + + + + Modified: cassandra/trunk/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/conf/cassandra.yaml (original) +++ cassandra/trunk/conf/cassandra.yaml Thu Mar 10 20:28:17 2011 @@ -87,7 +87,7 @@ commitlog_rotation_threshold_in_mb: 128 # performing the sync. commitlog_sync: periodic -# the other option is "timed," where writes may be acked immediately +# the other option is "periodic" where writes may be acked immediately # and the CommitLog is simply synced every commitlog_sync_period_in_ms # milliseconds. commitlog_sync_period_in_ms: 10000 Propchange: cassandra/trunk/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 10 20:28:17 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009 -/cassandra/branches/cassandra-0.7/contrib:1026516-1079936 +/cassandra/branches/cassandra-0.7/contrib:1026516-1080312 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573 Modified: cassandra/trunk/contrib/pig/bin/pig_cassandra URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/bin/pig_cassandra?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/contrib/pig/bin/pig_cassandra (original) +++ cassandra/trunk/contrib/pig/bin/pig_cassandra Thu Mar 10 20:28:17 2011 @@ -24,13 +24,13 @@ for jar in $cassandra_home/lib/*.jar $ca CLASSPATH=$CLASSPATH:$jar done -# cassandra_loadfunc jar. -LOADFUNC_JAR=`ls -1 $cwd/../build/*.jar` -if [ ! -e $LOADFUNC_JAR ]; then - echo "Unable to locate cassandra_loadfunc jar: please run ant." >&2 +# cassandra_storage jar. +STORAGE_JAR=`ls -1 $cwd/../build/*.jar` +if [ ! -e $STORAGE_JAR ]; then + echo "Unable to locate cassandra_storage jar: please run ant." >&2 exit 1 fi -CLASSPATH=$CLASSPATH:$LOADFUNC_JAR +CLASSPATH=$CLASSPATH:$STORAGE_JAR if [ "x$PIG_HOME" = "x" ]; then echo "PIG_HOME not set: requires Pig >= 0.7.0" >&2 Modified: cassandra/trunk/contrib/pig/build.xml URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/build.xml?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/contrib/pig/build.xml (original) +++ cassandra/trunk/contrib/pig/build.xml Thu Mar 10 20:28:17 2011 @@ -17,7 +17,7 @@ ~ specific language governing permissions and limitations ~ under the License. --> - + @@ -32,7 +32,7 @@ - + Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 10 20:28:17 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1079936 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1080312 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 10 20:28:17 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1079936 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1080312 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 10 20:28:17 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1079936 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1080312 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 10 20:28:17 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1079936 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1080312 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 10 20:28:17 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1079936 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1080312 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Mar 10 20:28:17 2011 @@ -32,6 +32,7 @@ import javax.management.ObjectName; import com.google.common.collect.Iterables; import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -374,7 +375,6 @@ public class ColumnFamilyStore implement { public void run() { - logger.info("Creating index {}.{}", table, indexedCfMetadata.cfName); try { forceBlockingFlush(); @@ -388,7 +388,6 @@ public class ColumnFamilyStore implement throw new AssertionError(e); } buildSecondaryIndexes(getSSTables(), FBUtilities.singleton(info.name)); - logger.info("Index {} complete", indexedCfMetadata.cfName); SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName); } }; @@ -397,7 +396,8 @@ public class ColumnFamilyStore implement public void buildSecondaryIndexes(Collection sstables, SortedSet columns) { - logger.debug("Submitting index build to compactionmanager"); + logger.info(String.format("Submitting index build of %s for data in %s", + metadata.comparator.getString(columns), StringUtils.join(sstables, ", "))); Table.IndexBuilder builder = table.createIndexBuilder(this, columns, new ReducingKeyIterator(sstables)); Future future = CompactionManager.instance.submitIndexBuild(this, builder); try @@ -414,6 +414,7 @@ public class ColumnFamilyStore implement { throw new RuntimeException(e); } + logger.info("Index build of " + metadata.comparator.getString(columns) + " complete"); } // called when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations. @@ -710,18 +711,22 @@ public class ColumnFamilyStore implement /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */ Future maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog) { - if (oldMemtable.isPendingFlush()) + // Only one thread will succeed in marking it as pending flush; the others can go back to processing writes + if (!oldMemtable.markPendingFlush()) + { + logger.debug("memtable is already pending flush; another thread must be flushing it"); return null; + } + assert memtable == oldMemtable; boolean isDropped = isIndex() ? DatabaseDescriptor.getCFMetaData(table.name, getParentColumnfamily()) == null : DatabaseDescriptor.getCFMetaData(metadata.cfId) == null; if (isDropped) - return null; // column family was dropped. no point in flushing. - - // Only one thread will succeed in marking it as pending flush; the others can go back to processing writes - if (!oldMemtable.markPendingFlush()) + { + logger.debug("column family was dropped; no point in flushing"); return null; + } // Table.flusherLock ensures that we schedule discardCompletedSegments calls in the same order as their // contexts (commitlog position) were read, even though the flush executor is multithreaded. @@ -729,15 +734,14 @@ public class ColumnFamilyStore implement try { final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance.getContext() : null; - logger.info("switching in a fresh Memtable for " + columnFamily + " at " + ctx); // submit the memtable for any indexed sub-cfses, and our own. List icc = new ArrayList(indexedColumns.size()); - icc.add(this); - for (ColumnFamilyStore indexCfs : indexedColumns.values()) + // don't assume that this.memtable is dirty; forceFlush can bring us here during index build even if it is not + for (ColumnFamilyStore cfs : Iterables.concat(Collections.singleton(this), indexedColumns.values())) { - if (!indexCfs.memtable.isClean()) - icc.add(indexCfs); + if (!cfs.memtable.isClean()) + icc.add(cfs); } final CountDownLatch latch = new CountDownLatch(icc.size()); for (ColumnFamilyStore cfs : icc) @@ -756,6 +760,10 @@ public class ColumnFamilyStore implement submitFlush(pendingFlush, latch); } } + // we marked our memtable as frozen as part of the concurrency control, + // so even if there was nothing to flush we need to switch it out + if (!icc.contains(this)) + memtable = new Memtable(this); // when all the memtables have been written, including for indexes, mark the flush in the commitlog header. // a second executor makes sure the onMemtableFlushes get called in the right order, @@ -799,8 +807,17 @@ public class ColumnFamilyStore implement public Future forceFlush() { - if (memtable.isClean()) + // during index build, 2ary index memtables can be dirty even if parent is not. if so, + // we want flushLargestMemtables to flush the 2ary index ones too. + boolean clean = true; + for (ColumnFamilyStore cfs : Iterables.concat(Collections.singleton(this), getIndexColumnFamilyStores())) + clean &= cfs.memtable.isClean(); + + if (clean) + { + logger.debug("forceFlush requested but everything is clean"); return null; + } return maybeSwitchMemtable(memtable, true); } @@ -1987,6 +2004,11 @@ public class ColumnFamilyStore implement return indexedColumns.get(column); } + public Collection getIndexColumnFamilyStores() + { + return indexedColumns.values(); + } + public ColumnFamily newIndexedColumnFamily(ByteBuffer column) { return ColumnFamily.create(indexedColumns.get(column).metadata); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu Mar 10 20:28:17 2011 @@ -191,11 +191,14 @@ public class CommitLog logger.info(headerPath + " incomplete, missing or corrupt. Everything is ok, don't panic. CommitLog will be replayed from the beginning"); logger.debug("exception was", ioe); } - if (replayPosition < 0) + if (replayPosition < 0 || replayPosition > reader.length()) { + // replayPosition > reader.length() can happen if some data gets flushed before it is written to the commitlog + // (see https://issues.apache.org/jira/browse/CASSANDRA-2285) logger.debug("skipping replay of fully-flushed {}", file); continue; } + reader.seek(replayPosition); if (logger.isDebugEnabled()) Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Mar 10 20:28:17 2011 @@ -32,6 +32,7 @@ import javax.management.ObjectName; import com.google.common.base.Charsets; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import org.apache.cassandra.db.commitlog.CommitLog; @@ -2330,14 +2331,28 @@ public class StorageService implements I ColumnFamilyStore largestByThroughput = null; for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { - if (largestByOps == null || cfs.getMemtableColumnsCount() > largestByOps.getMemtableColumnsCount()) + long ops = 0; + long throughput = 0; + for (ColumnFamilyStore subordinate : Iterables.concat(Collections.singleton(cfs), cfs.getIndexColumnFamilyStores())) + { + ops += subordinate.getMemtableColumnsCount(); + throughput = subordinate.getMemtableThroughputInMB(); + } + + if (ops > 0 && (largestByOps == null || ops > largestByOps.getMemtableColumnsCount())) + { + logger_.debug(ops + " total ops in " + cfs); largestByOps = cfs; - if (largestByThroughput == null || cfs.getMemtableThroughputInMB() > largestByThroughput.getMemtableThroughputInMB()) + } + if (throughput > 0 && (largestByThroughput == null || throughput > largestByThroughput.getMemtableThroughputInMB())) + { + logger_.debug(throughput + " total throughput in " + cfs); largestByThroughput = cfs; + } } if (largestByOps == null) { - logger_.error("Unable to reduce heap usage since there are no column families defined"); + logger_.info("Unable to reduce heap usage since there are no dirty column families"); return; } Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Thu Mar 10 20:28:17 2011 @@ -19,6 +19,7 @@ package org.apache.cassandra.tools; import java.io.File; +import java.io.IOError; import java.io.IOException; import java.io.PrintStream; import java.nio.ByteBuffer; @@ -39,6 +40,7 @@ import org.apache.cassandra.config.Confi import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex; import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes; @@ -101,28 +103,23 @@ public class SSTableExport * @param out output stream * @param comparator columns comparator * @param cfMetaData Column Family metadata (to get validator) + * @return pair of (number of columns serialized, last column serialized) */ - private static void serializeColumns(Iterator columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData) + private static Pair serializeColumns(Iterator columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData) { + int n = 0; + IColumn column = null; while (columns.hasNext()) { - serializeColumn(columns.next(), out, comparator, cfMetaData); + column = columns.next(); + n++; + serializeColumn(column, out, comparator, cfMetaData); if (columns.hasNext()) out.print(", "); } - } - /** - * Serialize a collection of the columns - * @param columns collection of the columns to serialize - * @param out output stream - * @param comparator columns comparator - * @param cfMetaData Column Family metadata (to get validator) - */ - private static void serializeColumns(Collection columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData) - { - serializeColumns(columns.iterator(), out, comparator, cfMetaData); + return new Pair(n, column == null ? null : column.name()); } /** @@ -198,25 +195,29 @@ public class SSTableExport IColumnIterator columns = filter.getSSTableColumnIterator(reader); - int columnCount = 0; - while (columns.hasNext()) - { - // setting new start column to the last of the current columns - startColumn = columns.next().name(); - columnCount++; - } - + Pair serialized; try { - columns = filter.getSSTableColumnIterator(reader); // iterator reset - serializeRow(columns, isSuperCF, out); + serialized = serializeRow(columns, isSuperCF, out); } catch (IOException e) { System.err.println("WARNING: Corrupt row " + key + " (skipping)."); + continue; + } + finally + { + try + { + columns.close(); + } + catch (IOException e) + { + throw new IOError(e); + } } - if (columnCount < PAGE_SIZE) + if (serialized.left < PAGE_SIZE) break; out.print(","); @@ -231,10 +232,11 @@ public class SSTableExport * @param columns columns of the row * @param isSuper true if wrapping Column Family is Super * @param out output stream + * @return pair of (number of columns serialized, last column serialized) * * @throws IOException on any I/O error. */ - private static void serializeRow(IColumnIterator columns, boolean isSuper, PrintStream out) throws IOException + private static Pair serializeRow(IColumnIterator columns, boolean isSuper, PrintStream out) throws IOException { ColumnFamily columnFamily = columns.getColumnFamily(); CFMetaData cfMetaData = columnFamily.metadata(); @@ -243,9 +245,12 @@ public class SSTableExport if (isSuper) { + int n = 0; + IColumn column = null; while (columns.hasNext()) { - IColumn column = columns.next(); + column = columns.next(); + n++; out.print(asKey(comparator.getString(column.name()))); out.print("{"); @@ -254,17 +259,19 @@ public class SSTableExport out.print(", "); out.print(asKey("subColumns")); out.print("["); - serializeColumns(column.getSubColumns(), out, columnFamily.getSubComparator(), cfMetaData); + serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData); out.print("]"); out.print("}"); if (columns.hasNext()) out.print(", "); } + + return new Pair(n, column == null ? null : column.name()); } else { - serializeColumns(columns, out, comparator, cfMetaData); + return serializeColumns(columns, out, comparator, cfMetaData); } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=1080338&r1=1080337&r2=1080338&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Thu Mar 10 20:28:17 2011 @@ -108,6 +108,30 @@ public class CommitLogTest extends Clean testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF } + @Test + public void testRecoveryWithHeaderPositionGreaterThanLogLength() throws Exception + { + // Note: this can actually happen (in periodic mode) when data is flushed + // before it had time to hit the commitlog (since the header is flushed by the system) + // see https://issues.apache.org/jira/browse/CASSANDRA-2285 + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out); + Checksum checksum = new CRC32(); + + // write the first checksum after the fixed-size part, so we won't read garbage lastFlushedAt data. + dos.writeInt(1); + checksum.update(1); + dos.writeLong(checksum.getValue()); + dos.writeInt(0); + checksum.update(0); + dos.writeInt(200); + checksum.update(200); + dos.writeLong(checksum.getValue()); + dos.close(); + + testRecovery(out.toByteArray(), new byte[0]); + } + protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception { Checksum checksum = new CRC32();