cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1080338 - in /cassandra/trunk: ./ conf/ contrib/ contrib/pig/ contrib/pig/bin/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassand...
Date Thu, 10 Mar 2011 20:28:18 GMT
Author: jbellis
Date: Thu Mar 10 20:28:17 2011
New Revision: 1080338

URL: http://svn.apache.org/viewvc?rev=1080338&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/build.xml
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/pig/bin/pig_cassandra
    cassandra/trunk/contrib/pig/build.xml
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7:1026516-1079936
+/cassandra/branches/cassandra-0.7:1026516-1080312
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Mar 10 20:28:17 2011
@@ -27,6 +27,9 @@
  * avoid writing empty rows when scrubbing tombstoned rows (CASSANDRA-2296)
  * fix assertion error in range and index scans for CL < ALL
    (CASSANDRA-2282)
+ * fix commitlog replay when flush position refers to data that didn't
+   get synced before server died (CASSANDRA-2285)
+ * fix fd leak in sstable2json with non-mmap'd i/o (CASSANDRA-2304)
 
 
 0.7.3

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Thu Mar 10 20:28:17 2011
@@ -631,7 +631,16 @@
           <fileset dir="@{inputdir}" includes="@{filter}" />
         </batchtest>
       </junit>
-      <fail if="testfailed" message="Some @{suitename} test(s) failed."/>
+      <fail message="Some @{suitename} test(s) failed.">
+        <condition>
+            <and>
+            <isset property="testfailed"/>
+            <not>
+              <isset property="ant.test.failure.ignore"/>
+            </not>
+          </and>
+        </condition>
+      </fail>
     </sequential>
   </macrodef>
 

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Mar 10 20:28:17 2011
@@ -87,7 +87,7 @@ commitlog_rotation_threshold_in_mb: 128
 # performing the sync.
 commitlog_sync: periodic
 
-# the other option is "timed," where writes may be acked immediately
+# the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
 # milliseconds.
 commitlog_sync_period_in_ms: 10000

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1079936
+/cassandra/branches/cassandra-0.7/contrib:1026516-1080312
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Modified: cassandra/trunk/contrib/pig/bin/pig_cassandra
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/bin/pig_cassandra?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/bin/pig_cassandra (original)
+++ cassandra/trunk/contrib/pig/bin/pig_cassandra Thu Mar 10 20:28:17 2011
@@ -24,13 +24,13 @@ for jar in $cassandra_home/lib/*.jar $ca
     CLASSPATH=$CLASSPATH:$jar
 done
 
-# cassandra_loadfunc jar.
-LOADFUNC_JAR=`ls -1 $cwd/../build/*.jar`
-if [ ! -e $LOADFUNC_JAR ]; then
-    echo "Unable to locate cassandra_loadfunc jar: please run ant." >&2
+# cassandra_storage jar.
+STORAGE_JAR=`ls -1 $cwd/../build/*.jar`
+if [ ! -e $STORAGE_JAR ]; then
+    echo "Unable to locate cassandra_storage jar: please run ant." >&2
     exit 1
 fi
-CLASSPATH=$CLASSPATH:$LOADFUNC_JAR
+CLASSPATH=$CLASSPATH:$STORAGE_JAR
 
 if [ "x$PIG_HOME" = "x" ]; then
     echo "PIG_HOME not set: requires Pig >= 0.7.0" >&2

Modified: cassandra/trunk/contrib/pig/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/build.xml?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/build.xml (original)
+++ cassandra/trunk/contrib/pig/build.xml Thu Mar 10 20:28:17 2011
@@ -17,7 +17,7 @@
  ~ specific language governing permissions and limitations
  ~ under the License.
  -->
-<project basedir="." default="jar" name="cassandra_loadfunc">
+<project basedir="." default="jar" name="cassandra_storage">
     <!-- stores the environment for locating PIG_HOME -->
     <property environment="env" />
     <property name="cassandra.dir" value="../.." />
@@ -32,7 +32,7 @@
     <property name="build.lib" value="${basedir}/lib" />
     <property name="build.out" value="${basedir}/build" />
     <property name="build.classes" value="${build.out}/classes" />
-    <property name="final.name" value="cassandra_loadfunc" />
+    <property name="final.name" value="cassandra_storage" />
 
     <path id="pig.classpath">
         <fileset file="${env.PIG_HOME}/pig*.jar" />

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1080312
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1080312
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1080312
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1080312
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 10 20:28:17 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1079936
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1080312
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Mar 10 20:28:17
2011
@@ -32,6 +32,7 @@ import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
 import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -374,7 +375,6 @@ public class ColumnFamilyStore implement
         {
             public void run()
             {
-                logger.info("Creating index {}.{}", table, indexedCfMetadata.cfName);
                 try
                 {
                     forceBlockingFlush();
@@ -388,7 +388,6 @@ public class ColumnFamilyStore implement
                     throw new AssertionError(e);
                 }
                 buildSecondaryIndexes(getSSTables(), FBUtilities.singleton(info.name));
-                logger.info("Index {} complete", indexedCfMetadata.cfName);
                 SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
             }
         };
@@ -397,7 +396,8 @@ public class ColumnFamilyStore implement
 
     public void buildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer>
columns)
     {
-        logger.debug("Submitting index build to compactionmanager");
+        logger.info(String.format("Submitting index build of %s for data in %s",
+                                  metadata.comparator.getString(columns), StringUtils.join(sstables,
", ")));
         Table.IndexBuilder builder = table.createIndexBuilder(this, columns, new ReducingKeyIterator(sstables));
         Future future = CompactionManager.instance.submitIndexBuild(this, builder);
         try
@@ -414,6 +414,7 @@ public class ColumnFamilyStore implement
         {
             throw new RuntimeException(e);
         }
+        logger.info("Index build of " + metadata.comparator.getString(columns) + " complete");
     }
 
     // called when dropping or renaming a CF. Performs mbean housekeeping and invalidates
CFS to other operations.
@@ -710,18 +711,22 @@ public class ColumnFamilyStore implement
     /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen
already.  threadsafe. */
     Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog)
     {
-        if (oldMemtable.isPendingFlush())
+        // Only one thread will succeed in marking it as pending flush; the others can go
back to processing writes
+        if (!oldMemtable.markPendingFlush())
+        {
+            logger.debug("memtable is already pending flush; another thread must be flushing
it");
             return null;
+        }
+        assert memtable == oldMemtable;
 
         boolean isDropped = isIndex()
                           ? DatabaseDescriptor.getCFMetaData(table.name, getParentColumnfamily())
== null
                           : DatabaseDescriptor.getCFMetaData(metadata.cfId) == null;
         if (isDropped)
-            return null; // column family was dropped. no point in flushing.
-
-        // Only one thread will succeed in marking it as pending flush; the others can go
back to processing writes
-        if (!oldMemtable.markPendingFlush())
+        {
+            logger.debug("column family was dropped; no point in flushing");
             return null;
+        }
 
         // Table.flusherLock ensures that we schedule discardCompletedSegments calls in the
same order as their
         // contexts (commitlog position) were read, even though the flush executor is multithreaded.
@@ -729,15 +734,14 @@ public class ColumnFamilyStore implement
         try
         {
             final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance.getContext()
: null;
-            logger.info("switching in a fresh Memtable for " + columnFamily + " at " + ctx);
 
             // submit the memtable for any indexed sub-cfses, and our own.
             List<ColumnFamilyStore> icc = new ArrayList<ColumnFamilyStore>(indexedColumns.size());
-            icc.add(this);
-            for (ColumnFamilyStore indexCfs : indexedColumns.values())
+            // don't assume that this.memtable is dirty; forceFlush can bring us here during
index build even if it is not
+            for (ColumnFamilyStore cfs : Iterables.concat(Collections.singleton(this), indexedColumns.values()))
             {
-                if (!indexCfs.memtable.isClean())
-                    icc.add(indexCfs);
+                if (!cfs.memtable.isClean())
+                    icc.add(cfs);
             }
             final CountDownLatch latch = new CountDownLatch(icc.size());
             for (ColumnFamilyStore cfs : icc)
@@ -756,6 +760,10 @@ public class ColumnFamilyStore implement
                     submitFlush(pendingFlush, latch);
                 }
             }
+            // we marked our memtable as frozen as part of the concurrency control,
+            // so even if there was nothing to flush we need to switch it out
+            if (!icc.contains(this))
+                memtable = new Memtable(this);
 
             // when all the memtables have been written, including for indexes, mark the
flush in the commitlog header.
             // a second executor makes sure the onMemtableFlushes get called in the right
order,
@@ -799,8 +807,17 @@ public class ColumnFamilyStore implement
 
     public Future<?> forceFlush()
     {
-        if (memtable.isClean())
+        // during index build, 2ary index memtables can be dirty even if parent is not. 
if so,
+        // we want flushLargestMemtables to flush the 2ary index ones too.
+        boolean clean = true;
+        for (ColumnFamilyStore cfs : Iterables.concat(Collections.singleton(this), getIndexColumnFamilyStores()))
+            clean &= cfs.memtable.isClean();
+
+        if (clean)
+        {
+            logger.debug("forceFlush requested but everything is clean");
             return null;
+        }
 
         return maybeSwitchMemtable(memtable, true);
     }
@@ -1987,6 +2004,11 @@ public class ColumnFamilyStore implement
         return indexedColumns.get(column);
     }
 
+    public Collection<ColumnFamilyStore> getIndexColumnFamilyStores()
+    {
+        return indexedColumns.values();
+    }
+
     public ColumnFamily newIndexedColumnFamily(ByteBuffer column)
     {
         return ColumnFamily.create(indexedColumns.get(column).metadata);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu Mar 10 20:28:17
2011
@@ -191,11 +191,14 @@ public class CommitLog
                     logger.info(headerPath + " incomplete, missing or corrupt.  Everything
is ok, don't panic.  CommitLog will be replayed from the beginning");
                     logger.debug("exception was", ioe);
                 }
-                if (replayPosition < 0)
+                if (replayPosition < 0 || replayPosition > reader.length())
                 {
+                    // replayPosition > reader.length() can happen if some data gets flushed
before it is written to the commitlog
+                    // (see https://issues.apache.org/jira/browse/CASSANDRA-2285)
                     logger.debug("skipping replay of fully-flushed {}", file);
                     continue;
                 }
+
                 reader.seek(replayPosition);
 
                 if (logger.isDebugEnabled())

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Mar 10 20:28:17
2011
@@ -32,6 +32,7 @@ import javax.management.ObjectName;
 import com.google.common.base.Charsets;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -2330,14 +2331,28 @@ public class StorageService implements I
         ColumnFamilyStore largestByThroughput = null;
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            if (largestByOps == null || cfs.getMemtableColumnsCount() > largestByOps.getMemtableColumnsCount())
+            long ops = 0;
+            long throughput = 0;
+            for (ColumnFamilyStore subordinate : Iterables.concat(Collections.singleton(cfs),
cfs.getIndexColumnFamilyStores()))
+            {
+                ops += subordinate.getMemtableColumnsCount();
+                throughput = subordinate.getMemtableThroughputInMB();
+            }
+
+            if (ops > 0 && (largestByOps == null || ops > largestByOps.getMemtableColumnsCount()))
+            {
+                logger_.debug(ops + " total ops in " + cfs);
                 largestByOps = cfs;
-            if (largestByThroughput == null || cfs.getMemtableThroughputInMB() > largestByThroughput.getMemtableThroughputInMB())
+            }
+            if (throughput > 0 && (largestByThroughput == null || throughput >
largestByThroughput.getMemtableThroughputInMB()))
+            {
+                logger_.debug(throughput + " total throughput in " + cfs);
                 largestByThroughput = cfs;
+            }
         }
         if (largestByOps == null)
         {
-            logger_.error("Unable to reduce heap usage since there are no column families
defined");
+            logger_.info("Unable to reduce heap usage since there are no dirty column families");
             return;
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Thu Mar 10 20:28:17
2011
@@ -19,6 +19,7 @@
 package org.apache.cassandra.tools;
 
 import java.io.File;
+import java.io.IOError;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
@@ -39,6 +40,7 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
 import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
@@ -101,28 +103,23 @@ public class SSTableExport
      * @param out output stream
      * @param comparator columns comparator
      * @param cfMetaData Column Family metadata (to get validator)
+     * @return pair of (number of columns serialized, last column serialized)
      */
-    private static void serializeColumns(Iterator<IColumn> columns, PrintStream out,
AbstractType comparator, CFMetaData cfMetaData)
+    private static Pair<Integer, ByteBuffer> serializeColumns(Iterator<IColumn>
columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
     {
+        int n = 0;
+        IColumn column = null;
         while (columns.hasNext())
         {
-            serializeColumn(columns.next(), out, comparator, cfMetaData);
+            column = columns.next();
+            n++;
+            serializeColumn(column, out, comparator, cfMetaData);
 
             if (columns.hasNext())
                 out.print(", ");
         }
-    }
 
-    /**
-     * Serialize a collection of the columns
-     * @param columns collection of the columns to serialize
-     * @param out output stream
-     * @param comparator columns comparator
-     * @param cfMetaData Column Family metadata (to get validator)
-     */
-    private static void serializeColumns(Collection<IColumn> columns, PrintStream out,
AbstractType comparator, CFMetaData cfMetaData)
-    {
-        serializeColumns(columns.iterator(), out, comparator, cfMetaData);
+        return new Pair<Integer, ByteBuffer>(n, column == null ? null : column.name());
     }
 
     /**
@@ -198,25 +195,29 @@ public class SSTableExport
 
             IColumnIterator columns = filter.getSSTableColumnIterator(reader);
 
-            int columnCount = 0;
-            while (columns.hasNext())
-            {
-                // setting new start column to the last of the current columns
-                startColumn = columns.next().name();
-                columnCount++;
-            }
-
+            Pair<Integer, ByteBuffer> serialized;
             try
             {
-                columns = filter.getSSTableColumnIterator(reader); // iterator reset
-                serializeRow(columns, isSuperCF, out);
+                serialized = serializeRow(columns, isSuperCF, out);
             }
             catch (IOException e)
             {
                 System.err.println("WARNING: Corrupt row " + key + " (skipping).");
+                continue;
+            }
+            finally
+            {
+                try
+                {
+                    columns.close();
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
             }
 
-            if (columnCount < PAGE_SIZE)
+            if (serialized.left < PAGE_SIZE)
                 break;
 
             out.print(",");
@@ -231,10 +232,11 @@ public class SSTableExport
      * @param columns columns of the row
      * @param isSuper true if wrapping Column Family is Super
      * @param out output stream
+     * @return pair of (number of columns serialized, last column serialized)
      *
      * @throws IOException on any I/O error.
      */
-    private static void serializeRow(IColumnIterator columns, boolean isSuper, PrintStream
out) throws IOException
+    private static Pair<Integer, ByteBuffer> serializeRow(IColumnIterator columns,
boolean isSuper, PrintStream out) throws IOException
     {
         ColumnFamily columnFamily = columns.getColumnFamily();
         CFMetaData cfMetaData = columnFamily.metadata();
@@ -243,9 +245,12 @@ public class SSTableExport
 
         if (isSuper)
         {
+            int n = 0;
+            IColumn column = null;
             while (columns.hasNext())
             {
-                IColumn column = columns.next();
+                column = columns.next();
+                n++;
 
                 out.print(asKey(comparator.getString(column.name())));
                 out.print("{");
@@ -254,17 +259,19 @@ public class SSTableExport
                 out.print(", ");
                 out.print(asKey("subColumns"));
                 out.print("[");
-                serializeColumns(column.getSubColumns(), out, columnFamily.getSubComparator(),
cfMetaData);
+                serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(),
cfMetaData);
                 out.print("]");
                 out.print("}");
 
                 if (columns.hasNext())
                     out.print(", ");
             }
+
+            return new Pair<Integer, ByteBuffer>(n, column == null ? null : column.name());
         }
         else
         {
-            serializeColumns(columns, out, comparator, cfMetaData);
+            return serializeColumns(columns, out, comparator, cfMetaData);
         }
     }
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=1080338&r1=1080337&r2=1080338&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Thu Mar 10 20:28:17
2011
@@ -108,6 +108,30 @@ public class CommitLogTest extends Clean
         testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
     }
 
+    @Test
+    public void testRecoveryWithHeaderPositionGreaterThanLogLength() throws Exception
+    {
+        // Note: this can actually happen (in periodic mode) when data is flushed
+        // before it had time to hit the commitlog (since the header is flushed by the system)
+        // see https://issues.apache.org/jira/browse/CASSANDRA-2285
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(out);
+        Checksum checksum = new CRC32();
+
+        // write the first checksum after the fixed-size part, so we won't read garbage lastFlushedAt
data.
+        dos.writeInt(1);
+        checksum.update(1);
+        dos.writeLong(checksum.getValue());
+        dos.writeInt(0);
+        checksum.update(0);
+        dos.writeInt(200);
+        checksum.update(200);
+        dos.writeLong(checksum.getValue());
+        dos.close();
+
+        testRecovery(out.toByteArray(), new byte[0]);
+    }
+
     protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
     {
         Checksum checksum = new CRC32();



Mime
View raw message