cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject svn commit: r967213 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/...
Date Fri, 23 Jul 2010 19:22:59 GMT
Author: brandonwilliams
Date: Fri Jul 23 19:22:58 2010
New Revision: 967213

URL: http://svn.apache.org/viewvc?rev=967213&view=rev
Log:
Keep persistent row size and column count statistics.  Patch by brandonwilliams and jbellis;
reviewed by jbellis for CASSANDRA-1155

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Jul 23 19:22:58 2010
@@ -44,6 +44,7 @@ dev
  * significantly faster reads from row cache (CASSANDRA-1267)
  * take advantage of row cache during range queries (CASSANDRA-1302)
  * make GCGraceSeconds a per-ColumnFamily value (CASSANDRA-1276)
+ * keep persistent row size and column count statistics (CASSANDRA-1155)
 
 
 0.6.4

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri Jul 23 19:22:58
2010
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.clock.Abs
 import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.StatisticsTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -64,6 +65,7 @@ public final class CFMetaData
     public static final CFMetaData HintsCf = new CFMetaData(Table.SYSTEM_TABLE, HintedHandOffManager.HINTS_CF,
ColumnFamilyType.Super, ClockType.Timestamp, UTF8Type.instance, BytesType.instance, new TimestampReconciler(),
"hinted handoff data", 0, false, 0.01, DEFAULT_GC_GRACE_SECONDS, 1, Collections.<byte[],
ColumnDefinition>emptyMap());
     public static final CFMetaData MigrationsCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.MIGRATIONS_CF,
ColumnFamilyType.Standard, ClockType.Timestamp, TimeUUIDType.instance, null, new TimestampReconciler(),
"individual schema mutations", 0, false, 0.01, DEFAULT_GC_GRACE_SECONDS, 2, Collections.<byte[],
ColumnDefinition>emptyMap());
     public static final CFMetaData SchemaCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.SCHEMA_CF,
ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(),
"current state of the schema", 0, false, 0.01, DEFAULT_GC_GRACE_SECONDS, 3, Collections. <byte[],
ColumnDefinition>emptyMap());
+    public static final CFMetaData StatisticsCf = new CFMetaData(Table.SYSTEM_TABLE, StatisticsTable.STATISTICS_CF,
ColumnFamilyType.Super, ClockType.Timestamp, UTF8Type.instance, BytesType.instance, new TimestampReconciler(),
"persistent CF statistics for the local node", 0, false, 0.01, DEFAULT_GC_GRACE_SECONDS, 4,
Collections.<byte[], ColumnDefinition>emptyMap());
 
     /**
      * @return An immutable mapping of (ksname,cfname) to id.

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul 23
19:22:58 2010
@@ -348,12 +348,14 @@ public class DatabaseDescriptor
             KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE, LocalStrategy.class,
1, new CFMetaData[]{CFMetaData.StatusCf,
                                                                                         
         CFMetaData.HintsCf,
                                                                                         
         CFMetaData.MigrationsCf,
-                                                                                        
         CFMetaData.SchemaCf
+                                                                                        
         CFMetaData.SchemaCf,
+                                                                                        
         CFMetaData.StatisticsCf
             });
             CFMetaData.map(CFMetaData.StatusCf);
             CFMetaData.map(CFMetaData.HintsCf);
             CFMetaData.map(CFMetaData.MigrationsCf);
             CFMetaData.map(CFMetaData.SchemaCf);
+            CFMetaData.map(CFMetaData.StatisticsCf);
             tables.put(Table.SYSTEM_TABLE, systemMeta);
             
             /* Load the seeds for node contact points */

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Fri Jul 23
19:22:58 2010
@@ -69,18 +69,20 @@ public class ColumnFamilySerializer impl
         serializeForSSTable(columnFamily, dos);
     }
 
-    public void serializeForSSTable(ColumnFamily columnFamily, DataOutput dos)
+    public int serializeForSSTable(ColumnFamily columnFamily, DataOutput dos)
     {
         try
         {
             serializeCFInfo(columnFamily, dos);
 
             Collection<IColumn> columns = columnFamily.getSortedColumns();
-            dos.writeInt(columns.size());
+            int count = columns.size();
+            dos.writeInt(count);
             for (IColumn column : columns)
             {
                 columnFamily.getColumnSerializer().serialize(column, dos);
             }
+            return count;
         }
         catch (IOException e)
         {
@@ -95,10 +97,10 @@ public class ColumnFamilySerializer impl
         columnFamily.getClockType().serializer().serialize(_markedForDeleteAt, dos);
     }
 
-    public void serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
+    public int serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
     {
         ColumnIndexer.serialize(columnFamily, dos);
-        serializeForSSTable(columnFamily, dos);
+        return serializeForSSTable(columnFamily, dos);
     }
 
     public ColumnFamily deserialize(DataInput dis) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jul 23 19:22:58
2010
@@ -98,7 +98,7 @@ public class ColumnFamilyStore implement
                                                TimeUnit.SECONDS,
                                                new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushWriters()),
                                                new NamedThreadFactory("FLUSH-WRITER-POOL"));
-    private static ExecutorService commitLogUpdater_ = new JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+    private static ExecutorService postFlushExecutor_ = new JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
     
     private static final FilenameFilter DB_NAME_FILTER = new FilenameFilter()
     {
@@ -133,10 +133,6 @@ public class ColumnFamilyStore implement
     private LatencyTracker readStats_ = new LatencyTracker();
     private LatencyTracker writeStats_ = new LatencyTracker();
 
-    private long minRowCompactedSize = 0L;
-    private long maxRowCompactedSize = 0L;
-    private long rowsCompactedTotalSize = 0L;
-    private long rowsCompactedCount = 0L;
     final CFMetaData metadata;
 
     ColumnFamilyStore(String table, String columnFamilyName, IPartitioner partitioner, int
generation, CFMetaData metadata)
@@ -257,32 +253,41 @@ public class ColumnFamilyStore implement
         }
     }
 
-    public void addToCompactedRowStats(long rowsize)
-    {
-        if (minRowCompactedSize < 1 || rowsize < minRowCompactedSize)
-            minRowCompactedSize = rowsize;
-        if (rowsize > maxRowCompactedSize)
-            maxRowCompactedSize = rowsize;
-        rowsCompactedCount++;
-        rowsCompactedTotalSize += rowsize;
-    }
-
     public long getMinRowCompactedSize()
     {
-        return minRowCompactedSize;
+        long min = 0;
+        for (SSTableReader sstable : ssTables_)
+        {
+           if (min == 0 || sstable.getEstimatedRowSize().min() < min)
+               min = sstable.getEstimatedRowSize().min();
+        }
+        return min;
     }
 
     public long getMaxRowCompactedSize()
     {
-        return maxRowCompactedSize;
+        long max = 0;
+        for (SSTableReader sstable : ssTables_)
+        {
+            if (sstable.getEstimatedRowSize().max() > max)
+                max = sstable.getEstimatedRowSize().max();
+        }
+        return max;
     }
 
     public long getMeanRowCompactedSize()
     {
-        if (rowsCompactedCount > 0)
-            return rowsCompactedTotalSize / rowsCompactedCount;
-        else
-            return 0L;
+        long sum = 0;
+        long count = 0;
+        for (SSTableReader sstable : ssTables_)
+        {
+            if (sstable.getEstimatedRowSize().median() > 0)
+            {
+                sum += sstable.getEstimatedRowSize().median();
+                count++;
+            }
+        }
+        return count > 0 ? sum / count : 0;
     }
 
     public static ColumnFamilyStore createColumnFamilyStore(String table, String columnFamily)
@@ -414,7 +419,7 @@ public class ColumnFamilyStore implement
             // when all the memtables have been written, including for indexes, mark the
flush in the commitlog header.
             // a second executor makes sure the onMemtableFlushes get called in the right
order,
             // while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
-            return commitLogUpdater_.submit(new WrappedRunnable()
+            return postFlushExecutor_.submit(new WrappedRunnable()
             {
                 public void runMayThrow() throws InterruptedException, IOException
                 {
@@ -1295,7 +1300,12 @@ public class ColumnFamilyStore implement
             }
         };
 
-        return commitLogUpdater_.submit(runnable);
+        return postFlushExecutor_.submit(runnable);
+    }
+
+    public static Future<?> submitPostFlush(Runnable runnable)
+    {
+        return postFlushExecutor_.submit(runnable);
     }
 
     public long getBloomFilterFalsePositives()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri Jul 23 19:22:58
2010
@@ -348,9 +348,6 @@ public class CompactionManager implement
 
                 writer.append(row);
                 totalkeysWritten++;
-
-                long rowsize = writer.getFilePointer() - prevpos;
-                cfs.addToCompactedRowStats(rowsize);
             }
         }
         finally

Added: cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java?rev=967213&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/StatisticsTable.java Fri Jul 23 19:22:58
2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+public class StatisticsTable
+{
+    private static Logger logger = LoggerFactory.getLogger(StatisticsTable.class);
+    public static final String STATISTICS_CF = "Statistics";
+    public static final byte[] ROWSIZE_SC = "RowSize".getBytes(UTF_8);
+    public static final byte[] COLUMNCOUNT_SC = "ColumnCount".getBytes(UTF_8);
+
+    private static DecoratedKey decorate(byte[] key)
+    {
+        return StorageService.getPartitioner().decorateKey(key);
+    }
+
+    public static void persistSSTableStatistics(Descriptor desc, EstimatedHistogram rowsizes,
EstimatedHistogram columncounts) throws IOException
+    {
+        String filename = getRelativePath(desc.filenameFor(SSTable.COMPONENT_DATA));
+        if (isTemporary(filename) || desc.ksname.equals(Table.SYSTEM_TABLE))
+            return;
+        long[] rowbuckets = rowsizes.getBucketOffsets();
+        long[] rowvalues = rowsizes.get(false);
+        long[] columnbuckets = columncounts.getBucketOffsets();
+        long[] columnvalues = columncounts.get(false);
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, filename.getBytes(UTF_8));
+        for (int i=0; i<rowbuckets.length; i++)
+        {
+            QueryPath path = new QueryPath(STATISTICS_CF, ROWSIZE_SC, FBUtilities.toByteArray(rowbuckets[i]));
+            rm.add(path, FBUtilities.toByteArray(rowvalues[i]), new TimestampClock(System.currentTimeMillis()));
+        }
+        for (int i=0; i<columnbuckets.length; i++)
+        {
+            QueryPath path = new QueryPath(STATISTICS_CF, COLUMNCOUNT_SC, FBUtilities.toByteArray(columnbuckets[i]));
+            rm.add(path, FBUtilities.toByteArray(columnvalues[i]), new TimestampClock(System.currentTimeMillis()));
+        }
+        rm.apply();
+        if (logger.isDebugEnabled())
+            logger.debug("Recorded SSTable statistics for " + filename);
+    }
+
+    public static void deleteSSTableStatistics(String filepath) throws IOException
+    {
+        String filename = getRelativePath(filepath);
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, filename.getBytes(UTF_8));
+        QueryPath path = new QueryPath(STATISTICS_CF);
+        rm.delete(path, new TimestampClock(System.currentTimeMillis()));
+        rm.apply();
+        if (logger.isDebugEnabled())
+            logger.debug("Deleted SSTable statistics for " + filename);
+    }
+
+    private static long[] getSSTableStatistics(String filepath, byte[] superCol) throws IOException
+    {
+        long[] rv;
+        String filename = getRelativePath(filepath);
+        QueryPath path = new QueryPath(STATISTICS_CF);
+        QueryFilter filter = QueryFilter.getNamesFilter(decorate(filename.getBytes(UTF_8)),
path, superCol);
+        ColumnFamily cf = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(STATISTICS_CF).getColumnFamily(filter);
+        if (cf == null)
+            return new long[0];
+        IColumn scolumn = cf.getColumn(superCol);
+        rv = new long[scolumn.getSubColumns().size()];
+        int i = 0;
+        for (IColumn col : scolumn.getSubColumns()) {
+            rv[i] = ByteBuffer.wrap(col.value()).getLong();
+            i++;
+        }
+        return rv;
+    }
+
+    public static long [] getSSTableRowSizeStatistics(String filename) throws IOException
+    {
+        return getSSTableStatistics(filename, ROWSIZE_SC);
+    }
+
+    public static long [] getSSTableColumnCountStatistics(String filename) throws IOException
+    {
+        return getSSTableStatistics(filename, COLUMNCOUNT_SC);
+    }
+
+    private static String getRelativePath(String filename)
+    {
+        for (String prefix : DatabaseDescriptor.getAllDataFileLocations())
+        {
+            if (filename.startsWith(prefix))
+               return filename.substring(prefix.length());
+        }
+        return filename;
+    }
+
+    private static boolean isTemporary(String filename)
+    {
+        return filename.contains("-" + SSTable.TEMPFILE_MARKER);
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java Fri Jul 23
19:22:58 2010
@@ -25,4 +25,6 @@ public abstract class AbstractCompactedR
     public abstract void update(MessageDigest digest);
 
     public abstract boolean isEmpty();
+
+    public abstract int columnCount();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Fri Jul 23 19:22:58
2010
@@ -151,6 +151,11 @@ public class LazilyCompactedRow extends 
         return Iterators.filter(iter, Predicates.notNull());
     }
 
+    public int columnCount()
+    {
+        return columnCount;
+    }
+
     private class LazyColumnIterator extends ReducingIterator<IColumn, IColumn>
     {
         ColumnFamily container = emptyColumnFamily.cloneMeShallow();

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Fri Jul 23 19:22:58
2010
@@ -22,6 +22,7 @@ public class PrecompactedRow extends Abs
     private static Logger logger = LoggerFactory.getLogger(PrecompactedRow.class);
 
     private final DataOutputBuffer buffer;
+    private int columnCount = 0;
 
     public PrecompactedRow(DecoratedKey key, DataOutputBuffer buffer)
     {
@@ -61,7 +62,7 @@ public class PrecompactedRow extends Abs
             ColumnFamily cfPurged = major ? ColumnFamilyStore.removeDeleted(cf, gcBefore)
: cf;
             if (cfPurged == null)
                 return;
-            ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
+            columnCount = ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
         }
         else
         {
@@ -69,6 +70,7 @@ public class PrecompactedRow extends Abs
             try
             {
                 rows.get(0).echoData(buffer);
+                columnCount = rows.get(0).getColumnCount();
             }
             catch (IOException e)
             {
@@ -92,4 +94,9 @@ public class PrecompactedRow extends Abs
     {
         return buffer.getLength() == 0;
     }
+
+    public int columnCount()
+    {
+        return columnCount;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Fri Jul 23 19:22:58
2010
@@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.db.StatisticsTable;
+import org.apache.cassandra.utils.EstimatedHistogram;
 
 /**
  * This class is built on top of the SequenceFile. It stores
@@ -61,6 +63,8 @@ public abstract class SSTable
     public static final String TEMPFILE_MARKER = "tmp";
 
     public static List<String> components = Collections.unmodifiableList(Arrays.asList(COMPONENT_FILTER,
COMPONENT_INDEX, COMPONENT_DATA));
+    protected EstimatedHistogram estimatedRowSize = new EstimatedHistogram(130);
+    protected EstimatedHistogram estimatedColumnCount = new EstimatedHistogram(112);
 
     protected SSTable(String filename, IPartitioner partitioner)
     {
@@ -75,6 +79,16 @@ public abstract class SSTable
         this.partitioner = partitioner;
     }
 
+    public EstimatedHistogram getEstimatedRowSize()
+    {
+        return estimatedRowSize;
+    }
+
+    public EstimatedHistogram getEstimatedColumnCount()
+    {
+        return estimatedColumnCount;
+    }
+
     public IPartitioner getPartitioner()
     {
         return partitioner;
@@ -119,6 +133,7 @@ public abstract class SSTable
                 FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(dataFilename)));
                 FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(dataFilename)));
                 FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(dataFilename)));
+                StatisticsTable.deleteSSTableStatistics(dataFilename);
             }
             catch (IOException e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Jul 23
19:22:58 2010
@@ -26,11 +26,13 @@ import java.lang.ref.Reference;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
+import org.apache.cassandra.db.StatisticsTable;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.EstimatedHistogram;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -140,6 +142,22 @@ public class SSTableReader extends SSTab
         return count;
     }
 
+    private void loadStatistics(Descriptor desc) throws IOException
+    {
+        // skip loading stats for the system table, or we will infinitely recurse
+        if (desc.ksname.equals(Table.SYSTEM_TABLE))
+            return;
+        if (logger.isDebugEnabled())
+            logger.debug("Load statistics for " + desc);
+        long[] rowsizes = StatisticsTable.getSSTableRowSizeStatistics(desc.filenameFor(SSTable.COMPONENT_DATA));
+        long[] colcounts = StatisticsTable.getSSTableColumnCountStatistics(desc.filenameFor(SSTable.COMPONENT_DATA));
+        if (rowsizes.length > 0)
+        {
+            estimatedRowSize = new EstimatedHistogram(rowsizes);
+            estimatedColumnCount = new EstimatedHistogram(colcounts);
+        }
+    }
+
     public static SSTableReader open(String dataFileName) throws IOException
     {
         return open(Descriptor.fromFilename(dataFileName));
@@ -191,17 +209,38 @@ public class SSTableReader extends SSTab
             sstable.load(false);
             sstable.loadBloomFilter();
         }
+        sstable.loadStatistics(desc);
 
         return sstable;
     }
 
+    SSTableReader(Descriptor desc,
+                  IPartitioner partitioner,
+                  SegmentedFile ifile,
+                  SegmentedFile dfile,
+                  IndexSummary indexSummary,
+                  BloomFilter bloomFilter,
+                  long maxDataAge)
+            throws IOException
+    {
+        super(desc, partitioner);
+        this.maxDataAge = maxDataAge;
+
+
+        this.ifile = ifile;
+        this.dfile = dfile;
+        this.indexSummary = indexSummary;
+        this.bf = bloomFilter;
+    }
+
     /**
      * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
      */
-    static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile
ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge) throws
IOException
+    static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile
ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge, EstimatedHistogram
rowsize,
+                                      EstimatedHistogram columncount) throws IOException
     {
         assert desc != null && partitioner != null && ifile != null &&
dfile != null && isummary != null && bf != null;
-        return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge);
+        return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge,
rowsize, columncount);
     }
 
     SSTableReader(Descriptor desc,
@@ -210,7 +249,9 @@ public class SSTableReader extends SSTab
                      SegmentedFile dfile,
                      IndexSummary indexSummary,
                      BloomFilter bloomFilter,
-                     long maxDataAge)
+                     long maxDataAge,
+                     EstimatedHistogram rowsize,
+                     EstimatedHistogram columncount)
     throws IOException
     {
         super(desc, partitioner);
@@ -221,6 +262,8 @@ public class SSTableReader extends SSTab
         this.dfile = dfile;
         this.indexSummary = indexSummary;
         this.bf = bloomFilter;
+        estimatedRowSize = rowsize;
+        estimatedColumnCount = columncount;
     }
 
     public void setTrackedBy(SSTableTracker tracker)

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Jul 23
19:22:58 2010
@@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.StatisticsTable;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
@@ -34,6 +36,7 @@ import org.apache.cassandra.io.util.Segm
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 public class SSTableWriter extends SSTable
 {
@@ -83,6 +86,8 @@ public class SSTableWriter extends SSTab
         long currentPosition = beforeAppend(row.key);
         FBUtilities.writeShortByteArray(row.key.key, dataFile);
         row.write(dataFile);
+        estimatedRowSize.add(dataFile.getFilePointer() - currentPosition);
+        estimatedColumnCount.add(row.columnCount());
         afterAppend(row.key, currentPosition);
     }
 
@@ -94,7 +99,7 @@ public class SSTableWriter extends SSTab
         long sizePosition = dataFile.getFilePointer();
         dataFile.writeLong(-1);
         // write out row data
-        ColumnFamily.serializer().serializeWithIndexes(cf, dataFile);
+        int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, dataFile);
         // seek back and write the row size (not including the size Long itself)
         long endPosition = dataFile.getFilePointer();
         dataFile.seek(sizePosition);
@@ -102,6 +107,8 @@ public class SSTableWriter extends SSTab
         // finally, reset for next row
         dataFile.seek(endPosition);
         afterAppend(decoratedKey, startPosition);
+        estimatedRowSize.add(endPosition - startPosition);
+        estimatedColumnCount.add(columnCount);
     }
 
     public void append(DecoratedKey decoratedKey, byte[] value) throws IOException
@@ -128,12 +135,21 @@ public class SSTableWriter extends SSTab
         dataFile.close(); // calls force
 
         // remove the 'tmp' marker from all components
-        Descriptor newdesc = rename(desc);
+        final Descriptor newdesc = rename(desc);
+
+        Runnable runnable = new WrappedRunnable()
+        {
+            protected void runMayThrow() throws IOException
+            {
+                StatisticsTable.persistSSTableStatistics(newdesc, estimatedRowSize, estimatedColumnCount);
+            }
+        };
+        ColumnFamilyStore.submitPostFlush(runnable);
 
         // finalize in-memory state for the reader
         SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
         SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
-        SSTableReader sstable = SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile,
iwriter.summary, iwriter.bf, maxDataAge);
+        SSTableReader sstable = SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile,
iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount);
         iwriter = null;
         dbuilder = null;
         return sstable;

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java Fri Jul 23
19:22:58 2010
@@ -25,27 +25,58 @@ public class EstimatedHistogram
 {
 
     /**
-     * This series starts at 1 and grows by 1.2 each time (rounding down and removing duplicates).
It goes from 1
-     * to around 30M, which will give us timing resolution from microseconds to 30 seconds,
with less precision
-     * as the numbers get larger.
+     * The series of values to which the counts in `buckets` correspond:
+     * 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 18, 22, etc.
+     * Thus, a `buckets` of [0, 0, 1, 10] would mean we had seen one value of 3 and 10 values
of 4.
+     *
+     * The series starts at 1 and grows by 1.2 each time (rounding and removing duplicates).
It goes from 1
+     * to around 36M by default (creating 90+1 buckets), which will give us timing resolution
from microseconds to
+     * 36 seconds, with less precision as the numbers get larger.
      */
-    private static final long[] bucketOffsets = {
-            1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 18, 22, 26, 31, 38, 46, 55, 66, 79, 95, 114,
137, 164, 197, 237, 284, 341, 410, 492, 590,
-            708, 850, 1020, 1224, 1469, 1763, 2116, 2539, 3047, 3657, 4388, 5266, 6319, 7583,
9100, 10920, 13104, 15725, 18870, 22644,
-            27173, 32608, 39130, 46956, 56347, 67617, 81140, 97368, 116842, 140210, 168252,
201903, 242283, 290740, 348888, 418666,
-            502400, 602880, 723456, 868147, 1041776, 1250132, 1500158, 1800190, 2160228,
2592274, 3110728, 3732874, 4479449, 5375339,
-            6450407, 7740489, 9288586, 11146304, 13375565, 16050678, 19260813, 23112976,
27735572, 33282686
-    };
-
-    private static final int numBuckets = bucketOffsets.length + 1;
+    private long[] bucketOffsets;
+    private int numBuckets;
 
     final AtomicLongArray buckets;
 
     public EstimatedHistogram()
     {
+        makeOffsets(90);
+        buckets = new AtomicLongArray(numBuckets);
+    }
+
+    public EstimatedHistogram(int bucketCount)
+    {
+        makeOffsets(bucketCount);
         buckets = new AtomicLongArray(numBuckets);
     }
 
+    public EstimatedHistogram(long[] bucketData)
+    {
+        makeOffsets(bucketData.length - 1);
+        buckets = new AtomicLongArray(bucketData);
+    }
+
+    private void makeOffsets(int size)
+    {
+        bucketOffsets = new long[size];
+        long last = 1;
+        bucketOffsets[0] = last;
+        for(int i = 1; i < size; i++)
+        {
+            long next = Math.round(last * 1.2);
+            if (next == last)
+                next++;
+            bucketOffsets[i] = next;
+            last = next;
+        }
+        numBuckets = bucketOffsets.length + 1;
+    }
+
+    public long[] getBucketOffsets()
+    {
+        return bucketOffsets;
+    }
+    
     public void add(long n)
     {
         int index = Arrays.binarySearch(bucketOffsets, n);
@@ -74,4 +105,40 @@ public class EstimatedHistogram
 
         return rv;
     }
+
+    public long min()
+    {
+        for (int i = 0; i < numBuckets; i++)
+        {
+            if (buckets.get(i) > 0)
+                return bucketOffsets[i == 0 ? 0 : i - 1];
+        }
+        return 0;
+    }
+
+    public long max()
+    {
+        for (int i = numBuckets - 1; i >= 0; i--)
+        {
+            if (buckets.get(i) > 0)
+                return bucketOffsets[i == 0 ? 0 : i - 1];
+        }
+        return 0;
+    }
+
+    public long median()
+    {
+        long max = 0;
+        long median = 0;
+        for (int i = 0; i < numBuckets; i++)
+        {
+            if (max < 1 || buckets.get(i) > max)
+            {
+                max = buckets.get(i);
+                if (max > 0)
+                    median = bucketOffsets[i == 0 ? 0 : i - 1];
+            }
+        }
+        return median;
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java Fri Jul
23 19:22:58 2010
@@ -110,4 +110,22 @@ public class SSTableReaderTest extends C
             assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == -1;
         }
     }
+
+    @Test
+    public void testPersistentStatistics() throws IOException, ExecutionException, InterruptedException
+    {
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+        for (int j = 0; j < 100; j += 2)
+        {
+            byte[] key = String.valueOf(j).getBytes();
+            RowMutation rm = new RowMutation("Keyspace1", key);
+            rm.add(new QueryPath("Standard1", null, "0".getBytes()), new byte[0], new TimestampClock(j));
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        assert store.getMaxRowCompactedSize() != 0;
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java?rev=967213&r1=967212&r2=967213&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java Fri Jul
23 19:22:58 2010
@@ -31,20 +31,23 @@ public class EstimatedHistogramTest
         EstimatedHistogram histogram = new EstimatedHistogram();
 
         histogram.add(0L);
-        assertEquals(1, histogram.get(true)[0]);
+        assertEquals(1, histogram.get(false)[0]);
 
         histogram.add(33282687);
-        assertEquals(1, histogram.get(true)[histogram.buckets.length()-1]);
+        assertEquals(1, histogram.get(false)[histogram.buckets.length()-1]);
 
         histogram.add(1);
-        assertEquals(1, histogram.get(true)[1]);
+        assertEquals(1, histogram.get(false)[1]);
 
         histogram.add(9);
-        assertEquals(1, histogram.get(true)[8]);
+        assertEquals(1, histogram.get(false)[8]);
 
-        histogram.add(23);
-        histogram.add(24);
-        histogram.add(25);
-        assertEquals(3, histogram.get(true)[13]);
+        histogram.add(20);
+        histogram.add(21);
+        histogram.add(22);
+        assertEquals(3, histogram.get(false)[13]);
+        assertEquals(1, histogram.min());
+        assertEquals(25109160, histogram.max());
+        assertEquals(20, histogram.median());
     }
 }



Mime
View raw message