Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 62809 invoked from network); 11 Mar 2010 17:52:43 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 11 Mar 2010 17:52:43 -0000 Received: (qmail 49060 invoked by uid 500); 11 Mar 2010 17:52:09 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 48993 invoked by uid 500); 11 Mar 2010 17:52:09 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 48985 invoked by uid 99); 11 Mar 2010 17:52:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Mar 2010 17:52:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Mar 2010 17:52:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B142F23888D1; Thu, 11 Mar 2010 17:51:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r921958 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ test/unit/org/apache/cassandra/db/commitlog/ Date: Thu, 11 Mar 2010 17:51:45 -0000 To: cassandra-commits@incubator.apache.org From: gdusbabek@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100311175145.B142F23888D1@eris.apache.org> Author: gdusbabek Date: Thu Mar 11 17:51:45 2010 New Revision: 921958 URL: http://svn.apache.org/viewvc?rev=921958&view=rev Log: store cfid map in CLH. Patch by Gary Dusbabek, reviewed by Jonathan Ellis. Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=921958&r1=921957&r2=921958&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu Mar 11 17:51:45 2010 @@ -18,8 +18,12 @@ package org.apache.cassandra.config; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -39,24 +43,18 @@ public final class CFMetaData public final static double DEFAULT_ROW_CACHE_SIZE = 0.0; private static final AtomicInteger idGen = new AtomicInteger(0); - private static final Map idToName = new HashMap(); - // this only gets used by a toString method. - public static final String getName(int id) - { - return idToName.get(id); - } + private static final Map, Integer> cfIdMap = new HashMap, Integer>(); - public static final int getCfCount() + public static final Map, Integer> getCfIdMap() { - return idToName.size(); + return Collections.unmodifiableMap(cfIdMap); } // this gets called after initialization to make sure that id generation happens properly. public static final void fixMaxId() { - int maxId = Collections.max(idToName.keySet()); - idGen.set(maxId + 1); + idGen.set(Collections.max(cfIdMap.values()) + 1); } public final String tableName; // name of table which has this column family @@ -85,7 +83,7 @@ public final class CFMetaData public CFMetaData(String tableName, String cfName, String columnType, AbstractType comparator, AbstractType subcolumnComparator, String comment, double rowCacheSize, double keyCacheSize) { this(tableName, cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize, keyCacheSize, nextId()); - idToName.put(cfId, cfName); + cfIdMap.put(new Pair(tableName, cfName), cfId); } // a quick and dirty pretty printer for describing the column family... Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=921958&r1=921957&r2=921958&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Mar 11 17:51:45 2010 @@ -704,6 +704,7 @@ public class DatabaseDescriptor ex.initCause(e); throw ex; } + CFMetaData.fixMaxId(); } public static IAuthenticator getAuthenticator() Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=921958&r1=921957&r2=921958&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Mar 11 17:51:45 2010 @@ -322,7 +322,8 @@ public class ColumnFamilyStore implement { // if we're not writing to the commit log, we are replaying the log, so marking // the log header with "you can discard anything written before the context" is not valid - CommitLog.instance().discardCompletedSegments(table_, columnFamily_, ctx); + final int cfId = DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).cfId; + CommitLog.instance().discardCompletedSegments(cfId, ctx); } } }); Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=921958&r1=921957&r2=921958&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Mar 11 17:51:45 2010 @@ -255,11 +255,6 @@ public class Table }, checkMs, checkMs); } - public int getColumnFamilyId(String columnFamily) - { - return DatabaseDescriptor.getTableDefinition(name).cfMetaData().get(columnFamily).cfId; - } - /** * Selects the specified column family for the specified key. */ Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=921958&r1=921957&r2=921958&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu Mar 11 17:51:45 2010 @@ -18,6 +18,9 @@ package org.apache.cassandra.db.commitlog; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.RowMutation; @@ -25,6 +28,7 @@ import org.apache.cassandra.db.Table; import org.apache.cassandra.io.util.BufferedRandomAccessFile; import org.apache.cassandra.io.DeletionService; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.concurrent.StageManager; @@ -178,6 +182,7 @@ public class CommitLog int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024); BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize); final CommitLogHeader clHeader = CommitLogHeader.readCommitLogHeader(reader); + final Map, Integer> cfIdMap = clHeader.getCfIdMap(); /* seek to the lowest position where any CF has non-flushed data */ int lowPos = CommitLogHeader.getLowestPosition(clHeader); if (lowPos == 0) @@ -235,7 +240,7 @@ public class CommitLog /* remove column families that have already been flushed before applying the rest */ for (ColumnFamily columnFamily : columnFamilies) { - int id = table.getColumnFamilyId(columnFamily.name()); + int id = cfIdMap.get(new Pair(table.name, columnFamily.name())); if (!clHeader.isDirty(id) || entryLocation < clHeader.getPosition(id)) { rm.removeColumnFamily(columnFamily); @@ -332,14 +337,13 @@ public class CommitLog * The bit flag associated with this column family is set in the * header and this is used to decide if the log file can be deleted. */ - public void discardCompletedSegments(final String tableName, final String cf, final CommitLogSegment.CommitLogContext context) throws IOException + public void discardCompletedSegments(final int cfId, final CommitLogSegment.CommitLogContext context) throws IOException { Callable task = new Callable() { public Object call() throws IOException { - int id = Table.open(tableName).getColumnFamilyId(cf); - discardCompletedSegmentsInternal(context, id); + discardCompletedSegmentsInternal(context, cfId); return null; } }; Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=921958&r1=921957&r2=921958&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java Thu Mar 11 17:51:45 2010 @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.io.util.BufferedRandomAccessFile; +import org.apache.cassandra.utils.Pair; class CommitLogHeader { @@ -48,12 +49,15 @@ class CommitLogHeader } private Map lastFlushedAt; // position at which each CF was last flushed - private final int maxSerializedSize; + private final byte[] serializedCfMap; // serialized. only needed during commit log recovery. + private final int cfCount; // we keep this in case cfcount changes in the interim (size of lastFlushedAt is not a good indication). + + private transient final int maxSerializedSize; + private transient Map, Integer> cfIdMap; // only needed during recovery. created from this.serializedCfMap. CommitLogHeader() { - lastFlushedAt = new HashMap(); - maxSerializedSize = 8 * CFMetaData.getCfCount(); + this(new HashMap(), serializeCfIdMap(CFMetaData.getCfIdMap()), CFMetaData.getCfIdMap().size()); } /* @@ -61,11 +65,14 @@ class CommitLogHeader * also builds an index of position to column family * Id. */ - private CommitLogHeader(Map lastFlushedAt) + private CommitLogHeader(Map lastFlushedAt, byte[] serializedCfMap, int cfCount) { - assert lastFlushedAt.size() <= CFMetaData.getCfCount(); + this.cfCount = cfCount; this.lastFlushedAt = lastFlushedAt; - maxSerializedSize = 8 * CFMetaData.getCfCount(); + this.serializedCfMap = serializedCfMap; + assert lastFlushedAt.size() <= cfCount; + // (size of lastFlushedAt) + (size of map buf) + (size of cfCount int) + maxSerializedSize = (8 * cfCount + 4) + (serializedCfMap.length + 4) + (4); } boolean isDirty(int cfId) @@ -93,6 +100,53 @@ class CommitLogHeader { return lastFlushedAt.isEmpty(); } + + synchronized Map, Integer> getCfIdMap() + { + if (cfIdMap != null) + return cfIdMap; + DataInputStream in = new DataInputStream(new ByteArrayInputStream(serializedCfMap)); + cfIdMap = new HashMap, Integer>(); + try + { + int sz = in.readInt(); + for (int i = 0; i < sz; i++) + { + Pair key = new Pair(in.readUTF(), in.readUTF()); + cfIdMap.put(key, in.readInt()); + } + } + catch (IOException ex) + { + throw new IOError(ex); + } + return cfIdMap; + } + + private static byte[] serializeCfIdMap(Map, Integer> map) + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out); + + try + { + dos.writeInt(map.size()); + for (Map.Entry, Integer> entry : map.entrySet()) + { + Pair p = entry.getKey(); + dos.writeUTF(p.left); + dos.writeUTF(p.right); + dos.writeInt(entry.getValue()); + } + dos.close(); + } + catch (IOException ex) + { + throw new IOError(ex); + } + + return out.toByteArray(); + } byte[] toByteArray() throws IOException { @@ -100,19 +154,20 @@ class CommitLogHeader DataOutputStream dos = new DataOutputStream(bos); serializer.serialize(this, dos); byte[] src = bos.toByteArray(); - assert src.length < maxSerializedSize; + assert src.length <= maxSerializedSize; byte[] dst = new byte[maxSerializedSize]; System.arraycopy(src, 0, dst, 0, src.length); return dst; } + // we use cf ids. getting the cf names would be pretty pretty expensive. public String toString() { StringBuilder sb = new StringBuilder(""); sb.append("CLH(dirty+flushed={"); for (Map.Entry entry : lastFlushedAt.entrySet()) - { - sb.append(CFMetaData.getName(entry.getKey())).append(": ").append(entry.getValue()).append(", "); + { + sb.append(entry.getKey()).append(": ").append(entry.getValue()).append(", "); } sb.append("})"); return sb.toString(); @@ -139,21 +194,28 @@ class CommitLogHeader { public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException { - dos.writeInt(clHeader.lastFlushedAt.size()); + assert clHeader.lastFlushedAt.size() <= clHeader.cfCount; + dos.writeInt(clHeader.cfCount); // 4 + dos.writeInt(clHeader.serializedCfMap.length); // 4 + dos.write(clHeader.serializedCfMap); // colMap.length + dos.writeInt(clHeader.lastFlushedAt.size()); // 4 for (Map.Entry entry : clHeader.lastFlushedAt.entrySet()) { - dos.writeInt(entry.getKey()); - dos.writeInt(entry.getValue()); + dos.writeInt(entry.getKey()); // 4 + dos.writeInt(entry.getValue()); // 4 } } public CommitLogHeader deserialize(DataInputStream dis) throws IOException { - int lfSz = dis.readInt(); + int colCount = dis.readInt(); + byte[] map = new byte[dis.readInt()]; + dis.readFully(map); + int size = dis.readInt(); Map lastFlushedAt = new HashMap(); - for (int i = 0; i < lfSz; i++) + for (int i = 0; i < size; i++) lastFlushedAt.put(dis.readInt(), dis.readInt()); - return new CommitLogHeader(lastFlushedAt); + return new CommitLogHeader(lastFlushedAt, map, colCount); } } } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=921958&r1=921957&r2=921958&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Thu Mar 11 17:51:45 2010 @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.zip.CRC32; import java.util.zip.Checksum; +import org.apache.cassandra.config.CFMetaData; import org.apache.log4j.Logger; import org.apache.cassandra.config.DatabaseDescriptor; @@ -100,11 +101,22 @@ public class CommitLogSegment // update header for (ColumnFamily columnFamily : rowMutation.getColumnFamilies()) { - int id = table.getColumnFamilyId(columnFamily.name()); - if (!header.isDirty(id)) + // we can ignore the serialized map in the header (and avoid deserializing it) since we know we are + // writing the cfs as they exist now. check for null cfm in case a cl write goes through after the cf is + // defined but before a new segment is created. + CFMetaData cfm = DatabaseDescriptor.getTableDefinition(table.name).cfMetaData().get(columnFamily.name()); + if (cfm == null) { - header.turnOn(id, logWriter.getFilePointer()); - seekAndWriteCommitLogHeader(header.toByteArray()); + logger.error("Attempted to write commit log entry for unrecognized column family: " + columnFamily.name()); + } + else + { + int id = cfm.cfId; + if (!header.isDirty(id)) + { + header.turnOn(id, logWriter.getFilePointer()); + seekAndWriteCommitLogHeader(header.toByteArray()); + } } } Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java?rev=921958&view=auto ============================================================================== --- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java (added) +++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java Thu Mar 11 17:51:45 2010 @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.Pair; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +public class CommitLogHeaderTest +{ + @Before + public void setup() + { + // data definitions must be loaded. + DatabaseDescriptor.getTables(); + } + + @Test + public void testEmptyHeader() + { + CommitLogHeader clh = new CommitLogHeader(); + assert CommitLogHeader.getLowestPosition(clh) == 0; + } + + @Test + public void lowestPositionWithZero() + { + // zero should never be the lowest position unless all positions are zero. + CommitLogHeader clh = new CommitLogHeader(); + clh.turnOn(2, 34); + assert CommitLogHeader.getLowestPosition(clh) == 34; + clh.turnOn(100, 0); + assert CommitLogHeader.getLowestPosition(clh) == 34; + clh.turnOn(65, 2); + assert CommitLogHeader.getLowestPosition(clh) == 2; + } + + @Test + public void lowestPositionEmpty() + { + CommitLogHeader clh = new CommitLogHeader(); + assert CommitLogHeader.getLowestPosition(clh) == 0; + } + + @Test + public void constantSize() throws IOException + { + CommitLogHeader clh = new CommitLogHeader(); + clh.turnOn(2, 34); + byte[] one = clh.toByteArray(); + + clh = new CommitLogHeader(); + for (int i = 0; i < 5; i++) + clh.turnOn(i, 1000 * i); + byte[] two = clh.toByteArray(); + + assert one.length == two.length; + } + + @Test + public void cfMapSerialization() throws IOException + { + Map, Integer> map = CFMetaData.getCfIdMap(); + CommitLogHeader clh = new CommitLogHeader(); + assert clh.getCfIdMap().equals(map); + } +}