cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r758997 - in /incubator/cassandra/trunk: src/org/apache/cassandra/db/ src/org/apache/cassandra/dht/ src/org/apache/cassandra/io/ src/org/apache/cassandra/service/ src/org/apache/cassandra/test/ test/org/apache/cassandra/io/
Date Fri, 27 Mar 2009 02:44:20 GMT
Author: jbellis
Date: Fri Mar 27 02:44:20 2009
New Revision: 758997

URL: http://svn.apache.org/viewvc?rev=758997&view=rev
Log:
consolidates partition behavior in IPartitioner, so creating a new partitioner should be only a matter of implementing that interface.  all the external switch statements on PartitionerType have been folded into that.

SSTable is now the only part of the code that cares about the distinction between a 'raw' key and a 'decorated' key.  variables in that class have been named clientKey or decoratedKey to show which is which.  others don't care either because they only deal with decorated keys (SequenceFile) or only with client keys (everyone else).  as part of this, I've merged some overloaded methods with substantially duplicated code to simplify auditing these changes.

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java
      - copied, changed from r758996, incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java
Removed:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java
    incubator/cassandra/trunk/test/org/apache/cassandra/io/SSTableTest.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:44:20 2009
@@ -1269,39 +1269,10 @@
                 + totalBytesWritten + "   Total keys read ..." + totalkeysRead);
         return result;
     }
-    
-    private void doWrite(SSTable ssTable, String key, DataOutputBuffer bufOut) throws IOException
-    {
-    	PartitionerType pType = StorageService.getPartitionerType();    	
-    	switch ( pType )
-    	{
-    		case OPHF:
-    			ssTable.append(key, bufOut);
-    			break;
-    			
-    	    default:
-    	    	String[] peices = key.split(":");
-    	    	key = peices[1];
-    	    	BigInteger hash = new BigInteger(peices[0]);
-    	    	ssTable.append(key, hash, bufOut);
-    	    	break;
-    	}
-    }
-    
-    private void doFill(BloomFilter bf, String key)
+
+    private void doFill(BloomFilter bf, String decoratedKey)
     {
-    	PartitionerType pType = StorageService.getPartitionerType();    	
-    	switch ( pType )
-    	{
-    		case OPHF:
-    			bf.fill(key);
-    			break;
-    			
-    	    default:
-    	    	String[] peices = key.split(":");    	    	
-    	    	bf.fill(peices[1]);
-    	    	break;
-    	}
+        bf.fill(StorageService.getPartitioner().undecorateKey(decoratedKey));
     }
     
     /*
@@ -1422,11 +1393,10 @@
 	                    	         
 	                    if ( ssTable == null )
 	                    {
-	                    	PartitionerType pType = StorageService.getPartitionerType();
-	                    	ssTable = new SSTable(compactionFileLocation, mergedFileName, pType);	                    	
+	                    	ssTable = new SSTable(compactionFileLocation, mergedFileName);	                    	
 	                    }
-	                    doWrite(ssTable, lastkey, bufOut);	                 
-	                    
+                        ssTable.append(lastkey, bufOut);
+
                         /* Fill the bloom filter with the key */
 	                    doFill(compactedBloomFilter, lastkey);                        
 	                    totalkeysWritten++;

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Fri Mar 27 02:44:20 2009
@@ -101,23 +101,7 @@
 
     public int compareTo(FileStruct f)
     {
-    	int value = 0;
-        PartitionerType pType = StorageService.getPartitionerType();
-        switch( pType )
-        {
-            case OPHF:
-                value = key_.compareTo(f.key_);                    
-                break;
-                
-            default:
-            	String lhs = key_.split(":")[0];            
-                BigInteger b = new BigInteger(lhs);
-                String rhs = f.key_.split(":")[0];
-                BigInteger b2 = new BigInteger(rhs);
-                value = b.compareTo(b2);
-                break;
-        }
-        return value;
+        return -StorageService.getPartitioner().getReverseDecoratedKeyComparator().compare(key_, f.key_);
     }
     
     public void close() throws IOException

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 02:44:20 2009
@@ -43,8 +43,6 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.service.PartitionerType;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.LogUtil;
 
@@ -420,51 +418,9 @@
             return;
         }
 
-        PartitionerType pType = StorageService.getPartitionerType();
         String directory = DatabaseDescriptor.getDataFileLocation();
         String filename = cfStore.getNextFileName();
-        SSTable ssTable = new SSTable(directory, filename, pType);
-        switch (pType)
-        {
-            case OPHF:
-                flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
-                break;
-
-            default:
-                flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
-                break;
-        }
-    }
-
-    private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
-    {
-        /* List of primary keys in sorted order */
-        List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies_.keySet() );
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        /* Use this BloomFilter to decide if a key exists in a SSTable */
-        BloomFilter bf = new BloomFilter(pKeys.size(), 15);
-        for ( PrimaryKey pKey : pKeys )
-        {
-            buffer.reset();
-            ColumnFamily columnFamily = columnFamilies_.get(pKey.key());
-            if ( columnFamily != null )
-            {
-                /* serialize the cf with column indexes */
-                ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
-                /* Now write the key and value to disk */
-                ssTable.append(pKey.key(), pKey.hash(), buffer);
-                bf.fill(pKey.key());
-                columnFamily.clear();
-            }
-        }
-        ssTable.close(bf);
-        cfStore.onMemtableFlush(cLogCtx);
-        cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
-        buffer.close();
-    }
-
-    private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
-    {
+        SSTable ssTable = new SSTable(directory, filename);
         List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
         Collections.sort(keys);
         DataOutputBuffer buffer = new DataOutputBuffer();
@@ -488,5 +444,8 @@
         cfStore.onMemtableFlush(cLogCtx);
         cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
         buffer.close();
+
+        columnFamilies_.clear();
     }
+
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java Fri Mar 27 02:44:20 2009
@@ -146,6 +146,6 @@
 
 	public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
     {
-    	return ssTable.next( key, cf, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
+    	return ssTable.next( key, cf, null, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
     }
 }

Copied: incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java (from r758996, incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java?p2=incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java&p1=incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java&r1=758996&r2=758997&rev=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java Fri Mar 27 02:44:20 2009
@@ -16,11 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.service;
+package org.apache.cassandra.dht;
 
-import java.math.BigInteger;
+import java.util.Comparator;
 
 public interface IPartitioner
 {
-    public BigInteger hash(String key);
+    /** transform key to on-disk format s.t. keys are stored in node comparison order.
+     *  this lets bootstrap rip out parts of the sstable sequentially instead of doing random seeks. */
+    public String decorateKey(String key);
+
+    public String undecorateKey(String decoratedKey);
+
+    public Comparator<String> getReverseDecoratedKeyComparator();
+
+    public Token getTokenForKey(String key);
+
+    public Token getDefaultToken();
+
+    public Token.TokenFactory getTokenFactory();
 }

Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java?rev=758997&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java Fri Mar 27 02:44:20 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Comparator;
+import java.util.Random;
+
+public class OrderPreservingPartitioner implements IPartitioner
+{
+    private static final Comparator<String> comparator = new Comparator<String>() {
+        public int compare(String o1, String o2)
+        {
+            return o2.compareTo(o1);
+        }
+    };
+
+    public String decorateKey(String key)
+    {
+        return key;
+    }
+
+    public String undecorateKey(String decoratedKey)
+    {
+        return decoratedKey;
+    }
+
+    public Comparator<String> getReverseDecoratedKeyComparator()
+    {
+        return comparator;
+    }
+
+    public StringToken getDefaultToken()
+    {
+        String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+        Random r = new Random();
+        StringBuffer buffer = new StringBuffer();
+        for (int j = 0; j < 16; j++) {
+            buffer.append(chars.charAt(r.nextInt(chars.length())));
+        }
+        return new StringToken(buffer.toString());
+    }
+
+    private final Token.TokenFactory<String> tokenFactory = new Token.TokenFactory<String>() {
+        public byte[] toByteArray(Token<String> stringToken)
+        {
+            try
+            {
+                return stringToken.token.getBytes("UTF-8");
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public Token<String> fromByteArray(byte[] bytes)
+        {
+            try
+            {
+                return new StringToken(new String(bytes, "UTF-8"));
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public Token<String> fromString(String string)
+        {
+            return new StringToken(string);
+        }
+    };
+
+    public Token.TokenFactory<String> getTokenFactory()
+    {
+        return tokenFactory;
+    }
+
+    public Token getTokenForKey(String key)
+    {
+        return new StringToken(key);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java?rev=758997&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java Fri Mar 27 02:44:20 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.util.Comparator;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.BigIntegerToken;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * This class generates a MD5 hash of the key. It uses the standard technique
+ * used in all DHT's.
+ * 
+ * @author alakshman
+ * 
+ */
+public class RandomPartitioner implements IPartitioner
+{
+    private static final Comparator<String> comparator = new Comparator<String>() {
+        public int compare(String o1, String o2)
+        {
+            BigInteger i1 = new BigInteger(o1.split(":")[0]);
+            BigInteger i2 = new BigInteger(o2.split(":")[0]);
+            return i2.compareTo(i1);
+        }
+    };
+
+    public BigInteger hash(String key)
+	{
+		return FBUtilities.hash(key);
+	}
+
+    public String decorateKey(String key)
+    {
+        return hash(key).toString() + ":" + key;
+    }
+
+    public String undecorateKey(String decoratedKey)
+    {
+        return decoratedKey.split(":")[1];
+    }
+
+    public Comparator<String> getReverseDecoratedKeyComparator()
+    {
+        return comparator;
+    }
+
+    public BigIntegerToken getDefaultToken()
+    {
+        String guid = GuidGenerator.guid();
+        BigInteger token = FBUtilities.hash(guid);
+        if ( token.signum() == -1 )
+            token = token.multiply(BigInteger.valueOf(-1L));
+        return new BigIntegerToken(token);
+    }
+
+    private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
+        public byte[] toByteArray(Token<BigInteger> bigIntegerToken)
+        {
+            return bigIntegerToken.token.toByteArray();
+        }
+
+        public Token<BigInteger> fromByteArray(byte[] bytes)
+        {
+            return new BigIntegerToken(new BigInteger(bytes));
+        }
+
+        public Token<BigInteger> fromString(String string)
+        {
+            return new BigIntegerToken(new BigInteger(string));
+        }
+    };
+
+    public Token.TokenFactory<BigInteger> getTokenFactory()
+    {
+        return tokenFactory;
+    }
+
+    public Token getTokenForKey(String key)
+    {
+        return new BigIntegerToken(FBUtilities.hash(key));
+    }
+}
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java Fri Mar 27 02:44:20 2009
@@ -92,31 +92,6 @@
 
     /**
      * This method dumps the next key/value into the DataOuputStream
-     * passed in.
-     *
-     * @param key key we are interested in.
-     * @param dos DataOutputStream that needs to be filled.
-     * @param section region of the file that needs to be read
-     * @throws IOException
-     * @return the number of bytes read.
-    */
-    public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException;
-
-    /**
-     * This method dumps the next key/value into the DataOuputStream
-     * passed in.
-     *
-     * @param key key we are interested in.
-     * @param dos DataOutputStream that needs to be filled.
-     * @param column name of the column in our format.
-     * @param section region of the file that needs to be read
-     * @throws IOException
-     * @return number of bytes that were read.
-    */
-    public long next(String key, DataOutputBuffer bufOut, String column, Coordinate section) throws IOException;
-
-    /**
-     * This method dumps the next key/value into the DataOuputStream
      * passed in. Always use this method to query for application
      * specific data as it will have indexes.
      *
@@ -130,23 +105,9 @@
      * @return number of bytes read.
      *
     */
-    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, Coordinate section) throws IOException;
+    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
     
     /**
-     * This method dumps the next key/value into the DataOuputStream
-     * passed in.
-     *
-     * @param key key we are interested in.
-     * @param dos DataOutputStream that needs to be filled.
-     * @param column name of the column in our format.
-     * @param timeRange time range we are interested in.
-     * @param section region of the file that needs to be read
-     * @throws IOException
-     * @return number of bytes that were read.
-    */
-    public long next(String key, DataOutputBuffer bufOut, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
-
-    /**
      * Close the file after reading.
      * @throws IOException
      */

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java Fri Mar 27 02:44:20 2009
@@ -18,11 +18,8 @@
 
 package org.apache.cassandra.io;
 
-import java.io.File;
 import java.io.IOException;
 
-import org.apache.cassandra.db.PrimaryKey;
-
 
 /**
  * An interface for writing into the SequenceFile abstraction.

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Fri Mar 27 02:44:20 2009
@@ -19,23 +19,19 @@
 package org.apache.cassandra.io;
 
 import java.io.*;
-import java.math.BigInteger;
-import java.nio.channels.FileChannel;
 import java.util.*;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.PartitionerType;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.IPartitioner;
+import org.apache.cassandra.service.PartitionerType;
 import org.apache.cassandra.utils.BasicUtilities;
 import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.db.RowMutation;
+
 import org.apache.log4j.Logger;
-import org.apache.cassandra.utils.*;
 
 /**
  * This class is built on top of the SequenceFile. It stores
@@ -138,60 +134,29 @@
     }
     
     /**
-     * This compares two strings and does it in reverse
-     * order.
-     * 
-     * @author alakshman
-     *
-     */
-    private static class OrderPreservingPartitionerComparator implements Comparator<String>
-    {
-        public int compare(String c1, String c2) 
-        {
-            return c2.compareTo(c1);
-        } 
-    }
-
-    /**
-     * This class compares two BigInteger's passes in
-     * as strings and does so in reverse order.
-     * @author alakshman
-     *
-     */
-    private static class RandomPartitionerComparator implements Comparator<String>
-    {
-        public int compare(String c1, String c2) 
-        {
-            BigInteger b1 = new BigInteger(c1);
-            BigInteger b2 = new BigInteger(c2);
-            return b2.compareTo(b1);
-        } 
-    }
-    
-    /**
      * This is a simple container for the index Key and its corresponding position
      * in the data file. Binary search is performed on a list of these objects
      * to lookup keys within the SSTable data file.
     */
     public static class KeyPositionInfo implements Comparable<KeyPositionInfo>
     {
-        private String key_;
+        private final String decoratedKey;
         private long position_;
 
-        public KeyPositionInfo(String key)
+        public KeyPositionInfo(String decoratedKey)
         {
-            key_ = key;            
+            this.decoratedKey = decoratedKey;
         }
 
-        public KeyPositionInfo(String key, long position)
+        public KeyPositionInfo(String decoratedKey, long position)
         {
-            this(key);
+            this(decoratedKey);
             position_ = position;
         }
 
         public String key()
         {
-            return key_;
+            return decoratedKey;
         }
 
         public long position()
@@ -201,25 +166,13 @@
 
         public int compareTo(KeyPositionInfo kPosInfo)
         {
-            int value = 0;
-            PartitionerType pType = StorageService.getPartitionerType();
-            switch( pType )
-            {
-                case OPHF:
-                    value = key_.compareTo(kPosInfo.key_);                    
-                    break;
-                    
-                default:
-                    BigInteger b = new BigInteger(key_);
-                    value = b.compareTo( new BigInteger(kPosInfo.key_) );
-                    break;
-            }
-            return value;
+            IPartitioner p = StorageService.getPartitioner();
+            return -p.getReverseDecoratedKeyComparator().compare(decoratedKey, kPosInfo.decoratedKey);
         }
 
         public String toString()
         {
-        	return key_ + ":" + position_;
+        	return decoratedKey + ":" + position_;
         }
     }
     
@@ -304,7 +257,7 @@
         List<String> indexedKeys = new ArrayList<String>();
         for ( KeyPositionInfo keyPositionInfo : keyPositionInfos )
         {
-            indexedKeys.add(keyPositionInfo.key_);
+            indexedKeys.add(keyPositionInfo.decoratedKey);
         }
 
         Collections.sort(indexedKeys);
@@ -361,13 +314,13 @@
      * Determines if the given key is in the specified file. If the
      * key is not present then we skip processing this file.
     */
-    public static boolean isKeyInFile(String key, String filename)
+    public static boolean isKeyInFile(String clientKey, String filename)
     {
         boolean bVal = false;
         BloomFilter bf = bfs_.get(filename);
         if ( bf != null )
         {
-            bVal = bf.isPresent(key);
+            bVal = bf.isPresent(clientKey);
         }
         return bVal;
     }
@@ -400,48 +353,13 @@
     public SSTable(String directory, String filename) throws IOException
     {  
         dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";                
-        blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
-        blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();        
-        // dataWriter_ = SequenceFile.writer(dataFile_);
-        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);        
-        SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition(); 
-    } 
-    
-    private void initBlockIndex()
-    {
-        initBlockIndex(StorageService.getPartitionerType());
-    }
-    
-    private void initBlockIndex(PartitionerType pType)
-    {
-        switch ( pType )
-        {
-            case OPHF: 
-                blockIndex_ = new TreeMap<String, BlockMetadata>( new SSTable.OrderPreservingPartitionerComparator() );                
-               break;
-               
-            default:
-                blockIndex_ = new TreeMap<String, BlockMetadata>( new SSTable.RandomPartitionerComparator() );
-                break;
-        }
-    }
-    
-    /**
-     * This ctor is used for DB writes into the SSTable. Use this
-     * version to write to the SSTable.
-    */
-    public SSTable(String directory, String filename, PartitionerType pType) throws IOException
-    {        
-        dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";
-        // dataWriter_ = SequenceFile.writer(dataFile_);
-        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);    
-        // dataWriter_ = SequenceFile.chksumWriter(dataFile_, 4*1024*1024);
-        SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition(); 
-        /* set up the block index based on partition type */
-        initBlockIndex(pType);
+        blockIndex_ = new TreeMap<String, BlockMetadata>(StorageService.getPartitioner().getReverseDecoratedKeyComparator());
         blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
-    }
-    
+        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
+        // dataWriter_ = SequenceFile.chksumWriter(dataFile_, 4*1024*1024);
+        SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition();
+    } 
+
     private void loadBloomFilter(IFileReader indexReader, long size) throws IOException
     {        
         /* read the position of the bloom filter */
@@ -458,8 +376,8 @@
         indexReader.next(bufOut);   
         bufOut.close();
         bufIn.reset(bufOut.getData(), bufOut.getLength());
-        String key = bufIn.readUTF();
-        if ( key.equals(SequenceFile.marker_) )
+        String clientKey = bufIn.readUTF();
+        if ( clientKey.equals(SequenceFile.marker_) )
         {
             /*
              * We are now reading the serialized Bloom Filter. We read
@@ -600,23 +518,23 @@
     /*
      * Seeks to the specified key on disk.
     */
-    public void touch(String key, boolean fData) throws IOException
+    public void touch(final String clientKey, boolean fData) throws IOException
     {
-        if ( touchCache_.containsKey(key) )
+        if (touchCache_.containsKey(dataFile_ + ":" + clientKey))
             return;
         
         IFileReader dataReader = SequenceFile.reader(dataFile_); 
         try
         {
         	/* Morph the key */
-        	key = morphKey(key);
-            Coordinate fileCoordinate = getCoordinates(key, dataReader);
+            String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
+            Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader);
             /* Get offset of key from block Index */
             dataReader.seek(fileCoordinate.end_);
-            BlockMetadata blockMetadata = dataReader.getBlockMetadata(key);
+            BlockMetadata blockMetadata = dataReader.getBlockMetadata(decoratedKey);
             if ( blockMetadata.position_ != -1L )
             {
-                touchCache_.put(dataFile_ + ":" + key, blockMetadata.position_);                  
+                touchCache_.put(dataFile_ + ":" + clientKey, blockMetadata.position_);
             } 
             
             if ( fData )
@@ -639,67 +557,34 @@
         }
     }
 
-    private long beforeAppend(String key) throws IOException
+    private long beforeAppend(String decoratedKey) throws IOException
     {
-    	if(key == null )
+    	if (decoratedKey == null )
             throw new IOException("Keys must not be null.");
-        if ( lastWrittenKey_ != null && key.compareTo(lastWrittenKey_) <= 0 )
+        Comparator<String> c = StorageService.getPartitioner().getReverseDecoratedKeyComparator();
+        if ( lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) <= 0 )
         {
             logger_.info("Last written key : " + lastWrittenKey_);
-            logger_.info("Current key : " + key);
+            logger_.info("Current key : " + decoratedKey);
             logger_.info("Writing into file " + dataFile_);
             throw new IOException("Keys must be written in ascending order.");
         }
-        long currentPosition = (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
-        return currentPosition;
-    }
-    
-    private long beforeAppend(BigInteger hash) throws IOException
-    {
-        if(hash == null )
-            throw new IOException("Keys must not be null.");
-        if ( lastWrittenKey_ != null )
-        {
-            BigInteger previousKey = new BigInteger(lastWrittenKey_);
-            if ( hash.compareTo(previousKey) <= 0 )
-            {
-                logger_.info("Last written key : " + previousKey);
-                logger_.info("Current key : " + hash);
-                logger_.info("Writing into file " + dataFile_);
-                throw new IOException("Keys must be written in ascending order.");
-            }
-        }
-        long currentPosition = (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
-        return currentPosition;
+        return (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
     }
 
-    private void afterAppend(String key, long position, long size) throws IOException
+    private void afterAppend(String decoratedKey, long position, long size) throws IOException
     {
         ++indexKeysWritten_;
-        lastWrittenKey_ = key;
-        blockIndex_.put(key, new BlockMetadata(position, size));
+        lastWrittenKey_ = decoratedKey;
+        blockIndex_.put(decoratedKey, new BlockMetadata(position, size));
         if ( indexKeysWritten_ == indexInterval_ )
         {
         	blockIndexes_.add(blockIndex_);
-        	blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
-            indexKeysWritten_ = 0;
-        }                
-    }
-    
-    private void afterAppend(BigInteger hash, long position, long size) throws IOException
-    {
-        ++indexKeysWritten_;
-        String key = hash.toString();
-        lastWrittenKey_ = key;
-        blockIndex_.put(key, new BlockMetadata(position, size));
-        if ( indexKeysWritten_ == indexInterval_ )
-        {
-            blockIndexes_.add(blockIndex_);
-            initBlockIndex();
+        	blockIndex_ = new TreeMap<String, BlockMetadata>(StorageService.getPartitioner().getReverseDecoratedKeyComparator());
             indexKeysWritten_ = 0;
         }                
     }
-    
+
     /**
      * Dumps all the block indicies for this SSTable
      * at the end of the file.
@@ -730,10 +615,10 @@
         Set<String> keys = blockIndex.keySet();                
         /* Number of keys in this block */
         bufOut.writeInt(keys.size());
-        for ( String key : keys )
+        for ( String decoratedKey : keys )
         {            
-            bufOut.writeUTF(key);
-            BlockMetadata blockMetadata = blockIndex.get(key);
+            bufOut.writeUTF(decoratedKey);
+            BlockMetadata blockMetadata = blockIndex.get(decoratedKey);
             /* position of the key as a relative offset */
             bufOut.writeLong(position - blockMetadata.position_);
             bufOut.writeLong(blockMetadata.size_);
@@ -752,37 +637,23 @@
         blockIndex.clear();        
     }
 
-    public void append(String key, DataOutputBuffer buffer) throws IOException
+    public void append(String clientKey, DataOutputBuffer buffer) throws IOException
     {
-        long currentPosition = beforeAppend(key);
-        dataWriter_.append(key, buffer);
-        afterAppend(key, currentPosition, buffer.getLength());
-    }
-    
-    public void append(String key, BigInteger hash, DataOutputBuffer buffer) throws IOException
-    {
-        long currentPosition = beforeAppend(hash);
-        /* Use as key - hash + ":" + key */
-        dataWriter_.append(hash + ":" + key, buffer);
-        afterAppend(hash, currentPosition, buffer.getLength());
+        String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
+        long currentPosition = beforeAppend(decoratedKey);
+        dataWriter_.append(decoratedKey, buffer);
+        afterAppend(decoratedKey, currentPosition, buffer.getLength());
     }
 
-    public void append(String key, byte[] value) throws IOException
-    {
-        long currentPosition = beforeAppend(key);
-        dataWriter_.append(key, value);
-        afterAppend(key, currentPosition, value.length );
-    }
-    
-    public void append(String key, BigInteger hash, byte[] value) throws IOException
+    public void append(String clientKey, byte[] value) throws IOException
     {
-        long currentPosition = beforeAppend(hash);
-        /* Use as key - hash + ":" + key */
-        dataWriter_.append(hash + ":" + key, value);
-        afterAppend(hash, currentPosition, value.length);
+        String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
+        long currentPosition = beforeAppend(decoratedKey);
+        dataWriter_.append(decoratedKey, value);
+        afterAppend(decoratedKey, currentPosition, value.length );
     }
 
-    private Coordinate getCoordinates(String key, IFileReader dataReader) throws IOException
+    private Coordinate getCoordinates(String decoratedKey, IFileReader dataReader) throws IOException
     {
     	List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
     	int size = (indexInfo == null) ? 0 : indexInfo.size();
@@ -790,7 +661,7 @@
     	long end = dataReader.getEOF();
         if ( size > 0 )
         {
-            int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(key));
+            int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(decoratedKey));
             if ( index < 0 )
             {
                 /*
@@ -840,97 +711,41 @@
         return new Coordinate(start, end);
     }
     
-    /**
-     * Convert the application key into the appropriate application
-     * key based on the partition type.
-     * 
-     * @param key the application key
-     * @return the appropriate key based on partition mechanism
-    */
-    private String morphKey(String key)
+    public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames) throws IOException
     {
-        String internalKey = key;
-        PartitionerType pType = StorageService.getPartitionerType();
-        switch ( pType )
-        {
-            case OPHF:
-                break;
-                
-            default:
-                internalKey = FBUtilities.hash(key).toString();
-                break;
-        }
-        return internalKey;
+        return next(clientKey, cfName, columnNames, null);
     }
-    
-    public DataInputBuffer next(String key, String cf, List<String> cNames) throws IOException
-    {
-    	DataInputBuffer bufIn = null;        
-        IFileReader dataReader = null;
-        try
-        {
-            dataReader = SequenceFile.reader(dataFile_);
-            /* Morph key into actual key based on the partition type. */ 
-            key = morphKey(key);
-            Coordinate fileCoordinate = getCoordinates(key, dataReader);    
-            /*
-             * we have the position we have to read from in order to get the
-             * column family, get the column family and column(s) needed.
-            */          
-            bufIn = getData(dataReader, key, cf, cNames, fileCoordinate);
-        }
-        finally
-        {
-            if ( dataReader != null )
-            {
-                dataReader.close();
-            }
-        }
-        return bufIn;
-    }
-    
-    public DataInputBuffer next(String key, String columnName) throws IOException
+
+    public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames, IndexHelper.TimeRange timeRange) throws IOException
     {
-        DataInputBuffer bufIn = null;
         IFileReader dataReader = null;
         try
         {
             dataReader = SequenceFile.reader(dataFile_);
             // dataReader = SequenceFile.chksumReader(dataFile_, 4*1024*1024);
-            /* Morph key into actual key based on the partition type. */ 
-            key = morphKey(key);
-            Coordinate fileCoordinate = getCoordinates(key, dataReader);
+
+            /* Morph key into actual key based on the partition type. */
+            String decoratedKey = StorageService.getPartitioner().decorateKey(clientKey);
+            Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader);
             /*
              * we have the position we have to read from in order to get the
              * column family, get the column family and column(s) needed.
-            */            
-            bufIn = getData(dataReader, key, columnName, fileCoordinate);
-        }
-        finally
-        {
-            if ( dataReader != null )
+            */
+            DataOutputBuffer bufOut = new DataOutputBuffer();
+            DataInputBuffer bufIn = new DataInputBuffer();
+
+            long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, timeRange, fileCoordinate);
+            if ( bytesRead != -1L )
             {
-                dataReader.close();
+                if ( bufOut.getLength() > 0 )
+                {
+                    bufIn.reset(bufOut.getData(), bufOut.getLength());
+                    /* read the key even though we do not use it */
+                    bufIn.readUTF();
+                    bufIn.readInt();
+                }
             }
-        }
-        return bufIn;
-    }
-    
-    public DataInputBuffer next(String key, String columnName, IndexHelper.TimeRange timeRange) throws IOException
-    {
-        DataInputBuffer bufIn = null;
-        IFileReader dataReader = null;
-        try
-        {
-            dataReader = SequenceFile.reader(dataFile_);
-            /* Morph key into actual key based on the partition type. */ 
-            key = morphKey(key);
-            Coordinate fileCoordinate = getCoordinates(key, dataReader);
-            /*
-             * we have the position we have to read from in order to get the
-             * column family, get the column family and column(s) needed.
-            */  
-            bufIn = getData(dataReader, key, columnName, timeRange, fileCoordinate);
+            return bufIn;
         }
         finally
         {
@@ -939,125 +754,14 @@
                 dataReader.close();
             }
         }
-        return bufIn;
-    }
-    
-    long getSeekPosition(String key, long start)
-    {
-        Long seekStart = touchCache_.get(dataFile_ + ":" + key);
-        if( seekStart != null)
-        {
-            return seekStart;
-        }
-        return start;
     }
-        
-    /*
-     * Get the data for the key from the position passed in. 
-    */
-    private DataInputBuffer getData(IFileReader dataReader, String key, String column, Coordinate section) throws IOException
-    {
-        DataOutputBuffer bufOut = new DataOutputBuffer();
-        DataInputBuffer bufIn = new DataInputBuffer();
-        
-        long bytesRead = dataReader.next(key, bufOut, column, section);
-        if ( bytesRead != -1L )
-        {
-            if ( bufOut.getLength() > 0 )
-            {                              
-                bufIn.reset(bufOut.getData(), bufOut.getLength());            
-                /* read the key even though we do not use it */
-                bufIn.readUTF();
-                bufIn.readInt();            
-            }
-        }
-        
-        return bufIn;
-    }
-    
-    private DataInputBuffer getData(IFileReader dataReader, String key, String cf, List<String> columns, Coordinate section) throws IOException
-    {
-        DataOutputBuffer bufOut = new DataOutputBuffer();
-        DataInputBuffer bufIn = new DataInputBuffer();
-                  
-        long bytesRead = dataReader.next(key, bufOut, cf, columns, section);
-        if ( bytesRead != -1L )
-        {
-            if ( bufOut.getLength() > 0 )
-            {                     
-                bufIn.reset(bufOut.getData(), bufOut.getLength());             
-                /* read the key even though we do not use it */
-                bufIn.readUTF();
-                bufIn.readInt();            
-            }        
-        }
-        return bufIn;
-    }
-    
-    /*
-     * Get the data for the key from the position passed in. 
-    */
-    private DataInputBuffer getData(IFileReader dataReader, String key, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
-    {
-        DataOutputBuffer bufOut = new DataOutputBuffer();
-        DataInputBuffer bufIn = new DataInputBuffer();
-                
-        try
-        {
-            dataReader.next(key, bufOut, column, timeRange, section);
-            if ( bufOut.getLength() > 0 )
-            {                              
-                bufIn.reset(bufOut.getData(), bufOut.getLength());            
-                /* read the key even though we do not use it */
-                bufIn.readUTF();
-                bufIn.readInt();            
-            }
-        }
-        catch ( IOException ex )
-        {
-            logger_.warn(LogUtil.throwableToString(ex));
-        }
-        return bufIn;
-    }
-    
-    /*
-     * Given a key we are interested in this method gets the
-     * closest index before the key on disk.
-     *
-     *  param @ key - key we are interested in.
-     *  return position of the closest index before the key
-     *  on disk or -1 if this key is not on disk.
-    */
-    private long getClosestIndexPositionToKeyOnDisk(String key)
+
+    public DataInputBuffer next(String clientKey, String columnFamilyColumn) throws IOException
     {
-        long position = -1L;
-        List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
-        int size = indexInfo.size();
-        int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(key));
-        if ( index < 0 )
-        {
-            /*
-             * We are here which means that the requested
-             * key is not an index.
-            */
-            index = (++index)*(-1);
-            /* this means key is not present at all */
-            if ( index >= size )
-                return position;
-            /* a scan is in order. */
-            position = (index == 0) ? 0 : indexInfo.get(index - 1).position();
-        }
-        else
-        {
-            /*
-             * If we are here that means the key is in the index file
-             * and we can retrieve it w/o a scan. In reality we would
-             * like to have a retreive(key, fromPosition) but for now
-             * we use scan(start, start + 1) - a hack.
-            */
-            position = indexInfo.get(index).position();
-        }
-        return position;
+        String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
+        String columnFamilyName = values[0];
+        List<String> cnNames = (values.length == 1) ? null : Arrays.asList(values[1]);
+        return next(clientKey, columnFamilyName, cnNames);
     }
 
     public void close() throws IOException

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Fri Mar 27 02:44:20 2009
@@ -561,36 +561,8 @@
         public String getFileName()
         {
             return filename_;
-        }   
-        
-        /**
-         * Given the application key this method basically figures if
-         * the key is in the block. Key comparisons differ based on the
-         * partition function. In OPHF key is stored as is but in the
-         * case of a Random hash key used internally is hash(key):key.
-         * @param key which we are looking for
-         * @param in DataInput stream into which we are looking for the key.
-         * @return true if key is found and false otherwise.
-         * @throws IOException
-         */
-        protected boolean isKeyInBlock(String key, DataInput in) throws IOException
-        {
-            boolean bVal = false;            
-            String keyInBlock = in.readUTF();
-            PartitionerType pType = StorageService.getPartitionerType();
-            switch ( pType )
-            {
-                case OPHF:
-                    bVal = keyInBlock.equals(key);
-                    break;
-                    
-                default:                    
-                    bVal = keyInBlock.split(":")[0].equals(key);
-                    break;
-            }
-            return bVal;
         }
-       
+
         /**
          * Return the position of the given key from the block index.
          * @param key the key whose offset is to be extracted from the current block index
@@ -618,7 +590,7 @@
             for ( int i = 0; i < keys; ++i )
             {            
                 String keyInBlock = bufIn.readUTF();                
-                if ( keyInBlock.equals(key) )                
+                if ( keyInBlock.equals(key) )
                 {                	
                     position = bufIn.readLong();
                     break;
@@ -666,8 +638,9 @@
             /* Number of keys in the block. */
             int keys = bufIn.readInt();
             for ( int i = 0; i < keys; ++i )
-            {                
-                if ( isKeyInBlock(key, bufIn) )
+            {
+                String keyInBlock = bufIn.readUTF();
+                if (keyInBlock.equals(key))
                 {
                     long position = bufIn.readLong();
                     long dataSize = bufIn.readLong();
@@ -751,7 +724,7 @@
         {
             /* Goto the Block Index */
             seek(section.end_);
-            long position = getPositionFromBlockIndex(key);            
+            long position = getPositionFromBlockIndex(key);
             seek(position);                   
         }
         
@@ -797,9 +770,9 @@
             }
             return totalBytesRead;
         }
-        
+
         /**
-         * Reads the column name indexes if present. If the 
+         * Reads the column name indexes if present. If the
          * indexes are based on time then skip over them.
          * @param cfName
          * @return
@@ -808,13 +781,13 @@
         {
             /* check if we have an index */
             boolean hasColumnIndexes = file_.readBoolean();
-            int totalBytesRead = 1;            
+            int totalBytesRead = 1;
             /* if we do then deserialize the index */
             if(hasColumnIndexes)
-            {        
+            {
                 if ( DatabaseDescriptor.isTimeSortingEnabled(cfName) )
                 {
-                    /* read the index */                            
+                    /* read the index */
                     totalBytesRead += IndexHelper.deserializeIndex(cfName, file_, columnIndexList);
                 }
                 else
@@ -1104,11 +1077,11 @@
          * @return total number of bytes read/considered
          *
         */
-        public long next(String key, DataOutputBuffer bufOut, String cf, List<String> columnNames, Coordinate section) throws IOException
+        public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
         {
-        	String[] values = RowMutation.getColumnAndColumnFamily(cf);
-    		String columnFamilyName = values[0];
-            List<String> cNames = new ArrayList<String>(columnNames);
+            assert timeRange == null || columnNames == null; // at most one may be non-null
+            
+            List<String> cNames = columnNames == null ? null : new ArrayList<String>(columnNames);
 
             long bytesRead = -1L;
             if ( isEOF() )
@@ -1116,8 +1089,8 @@
 
             seekTo(key, section);            
             /* note the position where the key starts */
-            long startPosition = file_.getFilePointer();            
-            String keyInDisk = readKeyFromDisk(file_);
+            long startPosition = file_.getFilePointer();
+            String keyInDisk = file_.readUTF();
             if ( keyInDisk != null )
             {
                 /*
@@ -1171,7 +1144,9 @@
                         
                         List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
                         /* read the column name indexes if present */
-                        int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);                        
+                        int totalBytesRead = (timeRange == null)
+                                           ? handleColumnNameIndexes(columnFamilyName, columnIndexList)
+                                           : handleColumnTimeIndexes(columnFamilyName, columnIndexList);
                     	dataSize -= totalBytesRead;
 
                         /* read the column family name */
@@ -1190,7 +1165,15 @@
                         /* sort the required list of columns */
                         Collections.sort(cNames);
                         /* get the various column ranges we have to read */
-                        List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
+                        List<IndexHelper.ColumnRange> columnRanges;
+                        if (timeRange == null)
+                        {
+                            columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
+                        }
+                        else
+                        {
+                            columnRanges = Arrays.asList(IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols));
+                        }
 
                         /* calculate the data size */
                         int numColsReturned = 0;
@@ -1280,65 +1263,8 @@
                 bytesRead = -1L;
             return bytesRead;
         }
-
-        /**
-         * This method dumps the next key/value into the DataOuputStream
-         * passed in.
-         *
-         * @param key - key we are interested in.
-         * @param dos - DataOutputStream that needs to be filled.
-         * @param section region of the file that needs to be read
-         * @return total number of bytes read/considered
-         */
-        public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException
-        {
-            long bytesRead = -1L;
-            if ( isEOF() )
-                return bytesRead;
-                   
-            seekTo(key, section);            
-            /* note the position where the key starts */
-            long startPosition = file_.getFilePointer(); 
-            String keyInDisk = readKeyFromDisk(file_);
-            if ( keyInDisk != null )
-            {
-                /*
-                 * If key on disk is greater than requested key
-                 * we can bail out since we exploit the property
-                 * of the SSTable format.
-                */
-                if ( keyInDisk.compareTo(key) > 0 )
-                    return bytesRead;
-
-                /*
-                 * If we found the key then we populate the buffer that
-                 * is passed in. If not then we skip over this key and
-                 * position ourselves to read the next one.
-                */
-                int dataSize = file_.readInt();
-                if ( keyInDisk.equals(key) )
-                {
-                    /* write the key into buffer */
-                    bufOut.writeUTF( keyInDisk );
-                    /* write data size into buffer */
-                    bufOut.writeInt(dataSize);
-                    /* write the data into buffer */
-                    bufOut.write(file_, dataSize);
-                }
-                else
-                {
-                    /* skip over data portion */
-                	file_.seek(dataSize + file_.getFilePointer());
-                }
-
-                long endPosition = file_.getFilePointer();
-                bytesRead = endPosition - startPosition;
-            }
-
-            return bytesRead;
-        }
     }
-    
+
     public static class Reader extends AbstractReader
     {
         Reader(String filename) throws IOException

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java Fri Mar 27 02:44:20 2009
@@ -19,8 +19,17 @@
 package org.apache.cassandra.service;
 
 import java.math.BigInteger;
+import java.util.Comparator;
 
 public interface IPartitioner
 {
     public BigInteger hash(String key);
+
+    /** transform key to on-disk format s.t. keys are stored in node comparison order.
+     *  this lets bootstrap rip out parts of the sstable sequentially instead of doing random seeks. */
+    public String decorateKey(String key);
+
+    public String undecorateKey(String decoratedKey);
+
+    public Comparator<String> getReverseDecoratedKeyComparator();
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java Fri Mar 27 02:44:20 2009
@@ -19,12 +19,21 @@
 package org.apache.cassandra.service;
 
 import java.math.BigInteger;
+import java.util.Comparator;
+import java.text.Collator;
 
 public class OrderPreservingHashPartitioner implements IPartitioner
 {
     private final static int maxKeyHashLength_ = 24;
     private static final BigInteger prime_ = BigInteger.valueOf(31);
-    
+    private static final Comparator<String> comparator = new Comparator<String>() {
+        public int compare(String o1, String o2)
+        {
+            return o2.compareTo(o1);
+        }
+    };
+
+
     public BigInteger hash(String key)
     {
         BigInteger h = BigInteger.ZERO;
@@ -39,4 +48,19 @@
         }
         return h;
     }
+
+    public String decorateKey(String key)
+    {
+        return key;
+    }
+
+    public String undecorateKey(String decoratedKey)
+    {
+        return decoratedKey;
+    }
+
+    public Comparator<String> getReverseDecoratedKeyComparator()
+    {
+        return comparator;
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java Fri Mar 27 02:44:20 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service;
 
 import java.math.BigInteger;
+import java.util.Comparator;
 
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -31,8 +32,32 @@
  */
 public class RandomPartitioner implements IPartitioner
 {
-	public BigInteger hash(String key)
+    private static final Comparator<String> comparator = new Comparator<String>() {
+        public int compare(String o1, String o2)
+        {
+            BigInteger i1 = new BigInteger(o1.split(":")[0]);
+            BigInteger i2 = new BigInteger(o2.split(":")[0]);
+            return i2.compareTo(i1);
+        }
+    };
+
+    public BigInteger hash(String key)
 	{
 		return FBUtilities.hash(key);
 	}
+
+    public String decorateKey(String key)
+    {
+        return hash(key).toString() + ":" + key;
+    }
+
+    public String undecorateKey(String decoratedKey)
+    {
+        return decoratedKey.split(":")[1];
+    }
+
+    public Comparator<String> getReverseDecoratedKeyComparator()
+    {
+        return comparator;
+    }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 02:44:20 2009
@@ -183,6 +183,10 @@
     {
         return partitioner_.hash(key);
     }
+
+    public static IPartitioner getPartitioner() {
+        return partitioner_;
+    }
     
     public static enum BootstrapMode
     {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java Fri Mar 27 02:44:20 2009
@@ -20,19 +20,12 @@
 
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.PrimaryKey;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.service.PartitionerType;
 import org.apache.cassandra.utils.BloomFilter;
 
 
@@ -59,71 +52,6 @@
         ssTable.close(bf);
     }
     
-    private static void hashSSTableWrite() throws Throwable
-    {        
-        Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>();                
-        byte[] bytes = new byte[64*1024];
-        Random random = new Random();
-        for ( int i = 100; i < 1000; ++i )
-        {
-            String key = Integer.toString(i);
-            ColumnFamily cf = new ColumnFamily("Test", "Standard");                      
-            // random.nextBytes(bytes);
-            cf.addColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
-            columnFamilies.put(key, cf);
-        } 
-        flushForRandomPartitioner(columnFamilies);
-    }
-    
-    private static void flushForRandomPartitioner(Map<String, ColumnFamily> columnFamilies) throws Throwable
-    {
-        SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra", "Table-Test-1", PartitionerType.RANDOM);
-        /* List of primary keys in sorted order */
-        List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies.keySet() );
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        /* Use this BloomFilter to decide if a key exists in a SSTable */
-        BloomFilter bf = new BloomFilter(pKeys.size(), 15);
-        for ( PrimaryKey pKey : pKeys )
-        {
-            buffer.reset();
-            ColumnFamily columnFamily = columnFamilies.get(pKey.key());
-            if ( columnFamily != null )
-            {
-                /* serialize the cf with column indexes */
-                ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
-                /* Now write the key and value to disk */
-                ssTable.append(pKey.key(), pKey.hash(), buffer);
-                bf.fill(pKey.key());                
-            }
-        }
-        ssTable.close(bf);
-    }
-    
-    private static void readSSTable() throws Throwable
-    {
-        SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra\\Table-Test-1-Data.db");  
-        for ( int i = 100; i < 1000; ++i )
-        {
-            String key = Integer.toString(i);            
-            DataInputBuffer bufIn = ssTable.next(key, "Test:C");
-            ColumnFamily cf = ColumnFamily.serializer().deserialize(bufIn);
-            if ( cf != null )
-            {            
-                System.out.println("KEY:" + key);
-                System.out.println(cf.name());
-                Collection<IColumn> columns = cf.getAllColumns();
-                for ( IColumn column : columns )
-                {
-                    System.out.println(column.name());
-                }
-            }
-            else
-            {
-                System.out.println("CF doesn't exist for key " + key);
-            }                             
-        }
-    }
-    
     public static void main(String[] args) throws Throwable
     {
         BloomFilter bf = new BloomFilter(1024*1024, 15);

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/io/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/io/SSTableTest.java?rev=758997&r1=758996&r2=758997&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/io/SSTableTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/io/SSTableTest.java Fri Mar 27 02:44:20 2009
@@ -1,6 +1,7 @@
 package org.apache.cassandra.io;
 
 import org.apache.cassandra.ServerTest;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.db.FileStruct;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.commons.collections.CollectionUtils;
@@ -28,8 +29,8 @@
         bf.fill(key);
         ssTable.close(bf);
 
-        // TODO append/next are not symmetrical ops anymore
-        
+        // TODO this is broken because SST/SequenceFile now assume that only CFs are written
+
         // verify
         ssTable = new SSTable(f.getPath() + "-Data.db");
         DataInputBuffer bufIn = ssTable.next(key, "Test:C");
@@ -58,7 +59,7 @@
         }
         ssTable.close(bf);
 
-        // TODO append/next are not symmetrical ops anymore
+        // TODO this is broken because SST/SequenceFile now assume that only CFs are written
 
         // verify
         List<String> keys = new ArrayList(map.keySet());



Mime
View raw message