cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r921958 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ test/unit/org/apache/cassandra/db/commitlog/
Date Thu, 11 Mar 2010 17:51:45 GMT
Author: gdusbabek
Date: Thu Mar 11 17:51:45 2010
New Revision: 921958

URL: http://svn.apache.org/viewvc?rev=921958&view=rev
Log:
store cfid map in CLH. Patch by Gary Dusbabek, reviewed by Jonathan Ellis.

Added:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=921958&r1=921957&r2=921958&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu Mar
11 17:51:45 2010
@@ -18,8 +18,12 @@
 
 package org.apache.cassandra.config;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -39,24 +43,18 @@ public final class CFMetaData
     public final static double DEFAULT_ROW_CACHE_SIZE = 0.0;
 
     private static final AtomicInteger idGen = new AtomicInteger(0);
-    private static final Map<Integer, String> idToName = new HashMap<Integer, String>();
     
-    // this only gets used by a toString method.
-    public static final String getName(int id)
-    {
-        return idToName.get(id);
-    }
+    private static final Map<Pair<String, String>, Integer> cfIdMap = new HashMap<Pair<String,
String>, Integer>();
     
-    public static final int getCfCount() 
+    public static final Map<Pair<String, String>, Integer> getCfIdMap()
     {
-        return idToName.size();
+        return Collections.unmodifiableMap(cfIdMap);    
     }
     
     // this gets called after initialization to make sure that id generation happens properly.
     public static final void fixMaxId()
     {
-        int maxId = Collections.max(idToName.keySet());
-        idGen.set(maxId + 1);
+        idGen.set(Collections.max(cfIdMap.values()) + 1);
     }
     
     public final String tableName;            // name of table which has this column family
@@ -85,7 +83,7 @@ public final class CFMetaData
     public CFMetaData(String tableName, String cfName, String columnType, AbstractType comparator,
AbstractType subcolumnComparator, String comment, double rowCacheSize, double keyCacheSize)
     {
         this(tableName, cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize,
keyCacheSize, nextId());
-        idToName.put(cfId, cfName);
+        cfIdMap.put(new Pair<String, String>(tableName, cfName), cfId);
     }
 
     // a quick and dirty pretty printer for describing the column family...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=921958&r1=921957&r2=921958&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu Mar 11 17:51:45 2010
@@ -704,6 +704,7 @@ public class DatabaseDescriptor
             ex.initCause(e);
             throw ex;
         }
+        CFMetaData.fixMaxId();
     }
 
     public static IAuthenticator getAuthenticator()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=921958&r1=921957&r2=921958&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu
Mar 11 17:51:45 2010
@@ -322,7 +322,8 @@ public class ColumnFamilyStore implement
                     {
                         // if we're not writing to the commit log, we are replaying the log,
so marking
                         // the log header with "you can discard anything written before the
context" is not valid
-                        CommitLog.instance().discardCompletedSegments(table_, columnFamily_,
ctx);
+                        final int cfId = DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).cfId;
+                        CommitLog.instance().discardCompletedSegments(cfId, ctx);
                     }
                 }
             });

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=921958&r1=921957&r2=921958&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Mar 11 17:51:45
2010
@@ -255,11 +255,6 @@ public class Table 
         }, checkMs, checkMs);
     }
 
-    public int getColumnFamilyId(String columnFamily)
-    {
-        return DatabaseDescriptor.getTableDefinition(name).cfMetaData().get(columnFamily).cfId;
-    }
-
     /**
      * Selects the specified column family for the specified key.
     */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=921958&r1=921957&r2=921958&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu
Mar 11 17:51:45 2010
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.db.commitlog;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RowMutation;
@@ -25,6 +28,7 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.DeletionService;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.concurrent.StageManager;
 
@@ -178,6 +182,7 @@ public class CommitLog
             int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
             BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(),
"r", bufferSize);
             final CommitLogHeader clHeader = CommitLogHeader.readCommitLogHeader(reader);
+            final Map<Pair<String, String>, Integer> cfIdMap = clHeader.getCfIdMap();
             /* seek to the lowest position where any CF has non-flushed data */
             int lowPos = CommitLogHeader.getLowestPosition(clHeader);
             if (lowPos == 0)
@@ -235,7 +240,7 @@ public class CommitLog
                         /* remove column families that have already been flushed before applying
the rest */
                         for (ColumnFamily columnFamily : columnFamilies)
                         {
-                            int id = table.getColumnFamilyId(columnFamily.name());
+                            int id = cfIdMap.get(new Pair<String, String>(table.name,
columnFamily.name()));
                             if (!clHeader.isDirty(id) || entryLocation < clHeader.getPosition(id))
                             {
                                 rm.removeColumnFamily(columnFamily);
@@ -332,14 +337,13 @@ public class CommitLog
      * The bit flag associated with this column family is set in the
      * header and this is used to decide if the log file can be deleted.
     */
-    public void discardCompletedSegments(final String tableName, final String cf, final CommitLogSegment.CommitLogContext
context) throws IOException
+    public void discardCompletedSegments(final int cfId, final CommitLogSegment.CommitLogContext
context) throws IOException
     {
         Callable task = new Callable()
         {
             public Object call() throws IOException
             {
-                int id = Table.open(tableName).getColumnFamilyId(cf);
-                discardCompletedSegmentsInternal(context, id);
+                discardCompletedSegmentsInternal(context, cfId);
                 return null;
             }
         };

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=921958&r1=921957&r2=921958&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
Thu Mar 11 17:51:45 2010
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.utils.Pair;
 
 class CommitLogHeader
 {    
@@ -48,12 +49,15 @@ class CommitLogHeader
     }
 
     private Map<Integer, Integer> lastFlushedAt; // position at which each CF was last
flushed
-    private final int maxSerializedSize;
+    private final byte[] serializedCfMap; // serialized. only needed during commit log recovery.
+    private final int cfCount; // we keep this in case cfcount changes in the interim (size
of lastFlushedAt is not a good indication).
+    
+    private transient final int maxSerializedSize;
+    private transient Map<Pair<String, String>, Integer> cfIdMap; // only needed
during recovery. created from this.serializedCfMap.
     
     CommitLogHeader()
     {
-        lastFlushedAt = new HashMap<Integer, Integer>();
-        maxSerializedSize = 8 * CFMetaData.getCfCount();
+        this(new HashMap<Integer, Integer>(), serializeCfIdMap(CFMetaData.getCfIdMap()),
CFMetaData.getCfIdMap().size());
     }
     
     /*
@@ -61,11 +65,14 @@ class CommitLogHeader
      * also builds an index of position to column family
      * Id.
     */
-    private CommitLogHeader(Map<Integer, Integer> lastFlushedAt)
+    private CommitLogHeader(Map<Integer, Integer> lastFlushedAt, byte[] serializedCfMap,
int cfCount)
     {
-        assert lastFlushedAt.size() <= CFMetaData.getCfCount();
+        this.cfCount = cfCount;
         this.lastFlushedAt = lastFlushedAt;
-        maxSerializedSize = 8 * CFMetaData.getCfCount();
+        this.serializedCfMap = serializedCfMap;
+        assert lastFlushedAt.size() <= cfCount;
+        // (size of lastFlushedAt) + (size of map buf) + (size of cfCount int)
+        maxSerializedSize = (8 * cfCount + 4) + (serializedCfMap.length + 4) + (4);
     }
         
     boolean isDirty(int cfId)
@@ -93,6 +100,53 @@ class CommitLogHeader
     {
         return lastFlushedAt.isEmpty();
     }
+    
+    synchronized Map<Pair<String, String>, Integer> getCfIdMap()
+    {
+        if (cfIdMap != null)
+            return cfIdMap;
+        DataInputStream in = new DataInputStream(new ByteArrayInputStream(serializedCfMap));
+        cfIdMap = new HashMap<Pair<String, String>, Integer>();
+        try
+        {
+            int sz = in.readInt();
+            for (int i = 0; i < sz; i++)
+            {
+                Pair<String, String> key = new Pair<String, String>(in.readUTF(),
in.readUTF());
+                cfIdMap.put(key, in.readInt());
+            }
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+        return cfIdMap;
+    }
+    
+    private static byte[] serializeCfIdMap(Map<Pair<String, String>, Integer>
map)
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(out);
+        
+        try
+        {
+            dos.writeInt(map.size());
+            for (Map.Entry<Pair<String, String>, Integer> entry : map.entrySet())
+            {
+                Pair<String, String> p = entry.getKey();
+                dos.writeUTF(p.left);
+                dos.writeUTF(p.right);
+                dos.writeInt(entry.getValue());
+            }
+            dos.close();
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+            
+        return out.toByteArray();
+    }
 
     byte[] toByteArray() throws IOException
     {
@@ -100,19 +154,20 @@ class CommitLogHeader
         DataOutputStream dos = new DataOutputStream(bos);        
         serializer.serialize(this, dos);
         byte[] src = bos.toByteArray();
-        assert src.length < maxSerializedSize;
+        assert src.length <= maxSerializedSize;
         byte[] dst = new byte[maxSerializedSize];
         System.arraycopy(src, 0, dst, 0, src.length);
         return dst;
     }
     
+    // we use cf ids. getting the cf names would be pretty pretty expensive.
     public String toString()
     {
         StringBuilder sb = new StringBuilder("");
         sb.append("CLH(dirty+flushed={");
         for (Map.Entry<Integer, Integer> entry : lastFlushedAt.entrySet())
-        {
-            sb.append(CFMetaData.getName(entry.getKey())).append(": ").append(entry.getValue()).append(",
");
+        {       
+            sb.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
         }
         sb.append("})");
         return sb.toString();
@@ -139,21 +194,28 @@ class CommitLogHeader
     {
         public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
         {
-            dos.writeInt(clHeader.lastFlushedAt.size());
+            assert clHeader.lastFlushedAt.size() <= clHeader.cfCount;
+            dos.writeInt(clHeader.cfCount); // 4
+            dos.writeInt(clHeader.serializedCfMap.length); // 4
+            dos.write(clHeader.serializedCfMap); // colMap.length
+            dos.writeInt(clHeader.lastFlushedAt.size()); // 4
             for (Map.Entry<Integer, Integer> entry : clHeader.lastFlushedAt.entrySet())
             {
-                dos.writeInt(entry.getKey());
-                dos.writeInt(entry.getValue());
+                dos.writeInt(entry.getKey()); // 4
+                dos.writeInt(entry.getValue()); // 4
             }
         }
 
         public CommitLogHeader deserialize(DataInputStream dis) throws IOException
         {
-            int lfSz = dis.readInt();
+            int colCount = dis.readInt();
+            byte[] map = new byte[dis.readInt()];
+            dis.readFully(map);
+            int size = dis.readInt();
             Map<Integer, Integer> lastFlushedAt = new HashMap<Integer, Integer>();
-            for (int i = 0; i < lfSz; i++)
+            for (int i = 0; i < size; i++)
                 lastFlushedAt.put(dis.readInt(), dis.readInt());
-            return new CommitLogHeader(lastFlushedAt);
+            return new CommitLogHeader(lastFlushedAt, map, colCount);
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=921958&r1=921957&r2=921958&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Thu Mar 11 17:51:45 2010
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -100,11 +101,22 @@ public class CommitLogSegment
             // update header
             for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
             {
-                int id = table.getColumnFamilyId(columnFamily.name());
-                if (!header.isDirty(id))
+                // we can ignore the serialized map in the header (and avoid deserializing
it) since we know we are
+                // writing the cfs as they exist now.  check for null cfm in case a cl write
goes through after the cf is 
+                // defined but before a new segment is created.
+                CFMetaData cfm = DatabaseDescriptor.getTableDefinition(table.name).cfMetaData().get(columnFamily.name());
+                if (cfm == null)
                 {
-                    header.turnOn(id, logWriter.getFilePointer());
-                    seekAndWriteCommitLogHeader(header.toByteArray());
+                    logger.error("Attempted to write commit log entry for unrecognized column
family: " + columnFamily.name());
+                }
+                else
+                {
+                    int id = cfm.cfId;
+                    if (!header.isDirty(id))
+                    {
+                        header.turnOn(id, logWriter.getFilePointer());
+                        seekAndWriteCommitLogHeader(header.toByteArray());
+                    }
                 }
             }
 

Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java?rev=921958&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
(added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
Thu Mar 11 17:51:45 2010
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.Pair;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+public class CommitLogHeaderTest
+{
+    @Before
+    public void setup()
+    {
+        // data definitions must be loaded.
+        DatabaseDescriptor.getTables();
+    }
+    
+    @Test
+    public void testEmptyHeader()
+    {
+        CommitLogHeader clh = new CommitLogHeader();
+        assert CommitLogHeader.getLowestPosition(clh) == 0;
+    }
+    
+    @Test
+    public void lowestPositionWithZero()
+    {
+        // zero should never be the lowest position unless all positions are zero.
+        CommitLogHeader clh = new CommitLogHeader();
+        clh.turnOn(2, 34);
+        assert CommitLogHeader.getLowestPosition(clh) == 34;
+        clh.turnOn(100, 0);
+        assert CommitLogHeader.getLowestPosition(clh) == 34;
+        clh.turnOn(65, 2);
+        assert CommitLogHeader.getLowestPosition(clh) == 2;
+    }
+    
+    @Test
+    public void lowestPositionEmpty()
+    {
+        CommitLogHeader clh = new CommitLogHeader();
+        assert CommitLogHeader.getLowestPosition(clh) == 0;
+    }
+    
+    @Test
+    public void constantSize() throws IOException
+    {
+        CommitLogHeader clh = new CommitLogHeader();
+        clh.turnOn(2, 34);
+        byte[] one = clh.toByteArray();
+        
+        clh = new CommitLogHeader();
+        for (int i = 0; i < 5; i++)
+            clh.turnOn(i, 1000 * i);
+        byte[] two = clh.toByteArray();
+        
+        assert one.length == two.length;
+    }
+    
+    @Test
+    public void cfMapSerialization() throws IOException
+    {
+        Map<Pair<String, String>, Integer> map = CFMetaData.getCfIdMap();
+        CommitLogHeader clh = new CommitLogHeader();
+        assert clh.getCfIdMap().equals(map);
+    }
+}



Mime
View raw message