cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r773728 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamilyStore.java io/SSTable.java
Date Mon, 11 May 2009 23:52:11 GMT
Author: jbellis
Date: Mon May 11 23:52:10 2009
New Revision: 773728

URL: http://svn.apache.org/viewvc?rev=773728&view=rev
Log:
reformat.  this is just whitespace changes, automated by the IDE.  patch by jbellis

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=773728&r1=773727&r2=773728&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 11 23:52:10 2009
@@ -64,14 +64,14 @@
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
     private static int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
-    private static final int BUFSIZE = 128*1024*1024;
+    private static final int BUFSIZE = 128 * 1024 * 1024;
     private static final int COMPACTION_MEMORY_THRESHOLD = 1 << 30;
     private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
 
     private final String table_;
     public final String columnFamily_;
     private final boolean isSuper_;
-    
+
     private volatile Integer memtableSwitchCount = 0;
 
     /* This is used to generate the next index for a SSTable */
@@ -113,7 +113,7 @@
          */
         List<Integer> indices = new ArrayList<Integer>();
         String[] dataFileDirectories = DatabaseDescriptor.getAllDataFileLocations();
-        for ( String directory : dataFileDirectories )
+        for (String directory : dataFileDirectories)
         {
             File fileDir = new File(directory);
             File[] files = fileDir.listFiles();
@@ -123,7 +123,7 @@
                 String[] tblCfName = getTableAndColumnFamilyName(filename);
 
                 if (tblCfName[0].equals(table)
-                        && tblCfName[1].equals(columnFamily))
+                    && tblCfName[1].equals(columnFamily))
                 {
                     int index = getIndexFromFileName(filename);
                     indices.add(index);
@@ -154,23 +154,23 @@
         /* Do major compaction */
         List<File> ssTables = new ArrayList<File>();
         String[] dataFileDirectories = DatabaseDescriptor.getAllDataFileLocations();
-        for ( String directory : dataFileDirectories )
+        for (String directory : dataFileDirectories)
         {
             File fileDir = new File(directory);
             File[] files = fileDir.listFiles();
             for (File file : files)
             {
                 String filename = file.getName();
-                if(((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_)) ) && (filename.contains(columnFamily_)))
+                if (((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_))) && (filename.contains(columnFamily_)))
                 {
-                	file.delete();
-                	continue;
+                    file.delete();
+                    continue;
                 }
-                
+
                 String[] tblCfName = getTableAndColumnFamilyName(filename);
                 if (tblCfName[0].equals(table_)
-                        && tblCfName[1].equals(columnFamily_)
-                        && filename.contains("-Data.db"))
+                    && tblCfName[1].equals(columnFamily_)
+                    && filename.contains("-Data.db"))
                 {
                     ssTables.add(file.getAbsoluteFile());
                 }
@@ -189,17 +189,19 @@
         SSTable.onStart(filenames);
         logger_.debug("Submitting a major compaction task ...");
         MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
-        if(columnFamily_.equals(Table.hints_))
+        if (columnFamily_.equals(Table.hints_))
         {
-        	HintedHandOffManager.instance().submit(this);
+            HintedHandOffManager.instance().submit(this);
         }
         // TODO this seems unnecessary -- each memtable flush checks to see if it needs to compact, too
         MinorCompactionManager.instance().submitPeriodicCompaction(this);
-        
+
         /* submit periodic flusher if required */
         int flushPeriod = DatabaseDescriptor.getFlushPeriod(table_, columnFamily_);
         if (flushPeriod > 0)
+        {
             PeriodicFlushManager.instance().submitPeriodicFlusher(this, flushPeriod);
+        }
     }
 
     List<String> getAllSSTablesOnDisk()
@@ -222,7 +224,7 @@
          * no files on disk we do not want to display
          * something ugly on the admin page.
         */
-        if ( ssTables_.size() == 0 )
+        if (ssTables_.size() == 0)
         {
             return sb.toString();
         }
@@ -231,7 +233,7 @@
         sb.append("Number of files on disk : " + ssTables_.size());
         sb.append(newLineSeparator);
         double totalSpace = 0d;
-        for ( String file : ssTables_ )
+        for (String file : ssTables_)
         {
             File f = new File(file);
             totalSpace += f.length();
@@ -250,14 +252,14 @@
     */
     void addToList(String file)
     {
-    	lock_.writeLock().lock();
+        lock_.writeLock().lock();
         try
         {
             ssTables_.add(file);
         }
         finally
         {
-        	lock_.writeLock().unlock();
+            lock_.writeLock().unlock();
         }
     }
 
@@ -275,8 +277,10 @@
                  * is present in the BloomFilter. If not continue to the next file.
                 */
                 boolean bVal = SSTable.isKeyInFile(key, file);
-                if ( !bVal )
+                if (!bVal)
+                {
                     continue;
+                }
                 SSTable ssTable = new SSTable(file, StorageService.getPartitioner());
                 ssTable.touch(key, fData);
             }
@@ -292,26 +296,32 @@
      * for the process to complete by waiting on a future pointer.
     */
     boolean forceCompaction(List<Range> ranges, EndPoint target, long skip, List<String> fileList)
-    {        
-    	Future<Boolean> futurePtr = null;
-    	if( ranges != null)
-    		futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
-    	else
-    		MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, skip);
-    	
+    {
+        Future<Boolean> futurePtr = null;
+        if (ranges != null)
+        {
+            futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
+        }
+        else
+        {
+            MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, skip);
+        }
+
         boolean result = true;
         try
         {
             /* Waiting for the compaction to complete. */
-        	if(futurePtr != null)
-        		result = futurePtr.get();
+            if (futurePtr != null)
+            {
+                result = futurePtr.get();
+            }
             logger_.debug("Done forcing compaction ...");
         }
         catch (ExecutionException ex)
         {
             logger_.debug(LogUtil.throwableToString(ex));
         }
-        catch ( InterruptedException ex2 )
+        catch (InterruptedException ex2)
         {
             logger_.debug(LogUtil.throwableToString(ex2));
         }
@@ -331,7 +341,9 @@
         while (st.hasMoreElements())
         {
             if (i == 0)
+            {
                 values[i] = (String) st.nextElement();
+            }
             else if (i == 1)
             {
                 values[i] = (String) st.nextElement();
@@ -360,7 +372,9 @@
         {
             index = (String) st.nextElement();
             if (i == (count - 2))
+            {
                 break;
+            }
             ++i;
         }
         return Integer.parseInt(index);
@@ -368,8 +382,8 @@
 
     String getNextFileName()
     {
-    	// Psuedo increment so that we do not generate consecutive numbers 
-    	fileIndexGenerator_.incrementAndGet();
+        // Psuedo increment so that we do not generate consecutive numbers
+        fileIndexGenerator_.incrementAndGet();
         return table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
     }
 
@@ -378,8 +392,8 @@
      */
     String getTempFileName()
     {
-    	// Psuedo increment so that we do not generate consecutive numbers 
-    	fileIndexGenerator_.incrementAndGet();
+        // Psuedo increment so that we do not generate consecutive numbers
+        fileIndexGenerator_.incrementAndGet();
         return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet();
     }
 
@@ -390,32 +404,34 @@
      * Since we do not generate consecutive numbers hence the lowest file number
      * can just be incremented to generate the next file. 
      */
-    String getTempFileName( List<String> files)
+    String getTempFileName(List<String> files)
     {
-    	int lowestIndex;
-    	int index;
-    	Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
-    	
-    	if( files.size() <= 1)
-    		return null;
-    	lowestIndex = getIndexFromFileName(files.get(0));
-   		
-   		index = lowestIndex + 1 ;
+        int lowestIndex;
+        int index;
+        Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+
+        if (files.size() <= 1)
+        {
+            return null;
+        }
+        lowestIndex = getIndexFromFileName(files.get(0));
+
+        index = lowestIndex + 1;
 
         return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index;
     }
 
-    
+
     /*
-     * This version is used only on start up when we are recovering from logs.
-     * In the future we may want to parellelize the log processing for a table
-     * by having a thread per log file present for recovery. Re-visit at that
-     * time.
-     */
+    * This version is used only on start up when we are recovering from logs.
+    * In the future we may want to parellelize the log processing for a table
+    * by having a thread per log file present for recovery. Re-visit at that
+    * time.
+    */
     void switchMemtable()
     {
         memtable_.set(new Memtable(table_, columnFamily_));
-        
+
         if (memtableSwitchCount == Integer.MAX_VALUE)
         {
             memtableSwitchCount = 0;
@@ -431,7 +447,7 @@
      */
     void switchBinaryMemtable(String key, byte[] buffer) throws IOException
     {
-        binaryMemtable_.set( new BinaryMemtable(table_, columnFamily_) );
+        binaryMemtable_.set(new BinaryMemtable(table_, columnFamily_));
         binaryMemtable_.get().put(key, buffer);
     }
 
@@ -463,11 +479,11 @@
     }
 
     /**
-     * Insert/Update the column family for this key. 
-     * param @ lock - lock that needs to be used. 
-     * param @ key - key for update/insert 
+     * Insert/Update the column family for this key.
+     * param @ lock - lock that needs to be used.
+     * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
-    */
+     */
     void apply(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx)
             throws IOException
     {
@@ -504,7 +520,6 @@
     }
 
     /**
-     *
      * Get the column family in the most efficient order.
      * 1. Memtable
      * 2. Sorted list of files
@@ -531,6 +546,7 @@
     /**
      * Fetch from disk files and go in sorted order  to be efficient
      * This fn exits as soon as the required data is found.
+     *
      * @param key
      * @param cf
      * @param columnFamilies
@@ -540,8 +556,8 @@
     private void getColumnFamilyFromDisk(String key, String cf, List<ColumnFamily> columnFamilies, IFilter filter) throws IOException
     {
         /* Scan the SSTables on disk first */
-        List<String> files = new ArrayList<String>();        
-    	lock_.readLock().lock();
+        List<String> files = new ArrayList<String>();
+        lock_.readLock().lock();
         try
         {
             files.addAll(ssTables_);
@@ -551,7 +567,7 @@
         {
             lock_.readLock().unlock();
         }
-    		        	        
+
         for (String file : files)
         {
             /*
@@ -559,15 +575,17 @@
              * is present in the BloomFilter. If not continue to the next file.
             */
             boolean bVal = SSTable.isKeyInFile(key, file);
-            if ( !bVal )
+            if (!bVal)
+            {
                 continue;
+            }
             ColumnFamily columnFamily = fetchColumnFamily(key, cf, filter, file);
             if (columnFamily != null)
             {
                 columnFamilies.add(columnFamily);
-                if(filter.isDone())
+                if (filter.isDone())
                 {
-                	break;
+                    break;
                 }
             }
         }
@@ -575,17 +593,21 @@
 
 
     private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, String ssTableFile) throws IOException
-	{
-		SSTable ssTable = new SSTable(ssTableFile, StorageService.getPartitioner());
-		DataInputBuffer bufIn;
-		bufIn = filter.next(key, cf, ssTable);
-		if (bufIn.getLength() == 0)
-			return null;
+    {
+        SSTable ssTable = new SSTable(ssTableFile, StorageService.getPartitioner());
+        DataInputBuffer bufIn;
+        bufIn = filter.next(key, cf, ssTable);
+        if (bufIn.getLength() == 0)
+        {
+            return null;
+        }
         ColumnFamily columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
-		if (columnFamily == null)
-			return null;
-		return columnFamily;
-	}
+        if (columnFamily == null)
+        {
+            return null;
+        }
+        return columnFamily;
+    }
 
     private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
     {
@@ -597,7 +619,9 @@
         }
     }
 
-    /** like resolve, but leaves the resolved CF as the only item in the list */
+    /**
+     * like resolve, but leaves the resolved CF as the only item in the list
+     */
     private static void merge(List<ColumnFamily> columnFamilies)
     {
         ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
@@ -605,7 +629,8 @@
         columnFamilies.add(cf);
     }
 
-    private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily> columnFamilies) {
+    private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily> columnFamilies)
+    {
         ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
         return removeDeleted(cf);
     }
@@ -618,13 +643,15 @@
      */
     static ColumnFamily removeDeleted(ColumnFamily cf)
     {
-        return removeDeleted(cf, (int)(System.currentTimeMillis() / 1000) - DatabaseDescriptor.getGcGraceInSeconds());
+        return removeDeleted(cf, (int) (System.currentTimeMillis() / 1000) - DatabaseDescriptor.getGcGraceInSeconds());
     }
 
     static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
     {
         if (cf == null)
+        {
             return null;
+        }
 
         // in case of a timestamp tie, tombstones get priority over non-tombstones.
         // we want this to be deterministic in general to avoid confusion;
@@ -679,7 +706,7 @@
      */
     void applyNow(String key, ColumnFamily columnFamily) throws IOException
     {
-         memtable_.get().putOnRecovery(key, columnFamily);
+        memtable_.get().putOnRecovery(key, columnFamily);
     }
 
     /*
@@ -689,8 +716,10 @@
      */
     void onMemtableFlush(CommitLog.CommitLogContext cLogCtx) throws IOException
     {
-        if ( cLogCtx.isValidContext() )
+        if (cLogCtx.isValidContext())
+        {
             CommitLog.open(table_).onMemtableFlush(columnFamily_, cLogCtx);
+        }
     }
 
     /*
@@ -706,7 +735,7 @@
     void storeLocation(String filename, BloomFilter bf)
     {
         int ssTableSize = 0;
-    	lock_.writeLock().lock();
+        lock_.writeLock().lock();
         try
         {
             ssTables_.add(filename);
@@ -715,7 +744,7 @@
         }
         finally
         {
-        	lock_.writeLock().unlock();
+            lock_.writeLock().unlock();
         }
 
         if ((ssTableSize >= COMPACTION_THRESHOLD && !isCompacting_.get())
@@ -730,34 +759,36 @@
     PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
     {
         PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
-        if (files.size() > 1 || (ranges != null &&  files.size() > 0))
+        if (files.size() > 1 || (ranges != null && files.size() > 0))
         {
-            int bufferSize = Math.min( (ColumnFamilyStore.COMPACTION_MEMORY_THRESHOLD / files.size()), minBufferSize ) ;
+            int bufferSize = Math.min((ColumnFamilyStore.COMPACTION_MEMORY_THRESHOLD / files.size()), minBufferSize);
             FileStruct fs = null;
             for (String file : files)
             {
-            	try
-            	{
-            		fs = new FileStruct(SequenceFile.bufferedReader(file, bufferSize), StorageService.getPartitioner());
-	                fs.advance();
-	                if(fs.isExhausted())
-	                	continue;
-	                pq.add(fs);
-            	}
-            	catch ( Exception ex)
-            	{
+                try
+                {
+                    fs = new FileStruct(SequenceFile.bufferedReader(file, bufferSize), StorageService.getPartitioner());
+                    fs.advance();
+                    if (fs.isExhausted())
+                    {
+                        continue;
+                    }
+                    pq.add(fs);
+                }
+                catch (Exception ex)
+                {
                     logger_.warn("corrupt file?  or are you just blowing away data files manually out from under me?", ex);
-            		try
-            		{
-            			if (fs != null)
-            			{
-            				fs.close();
-            			}
-            		}
-            		catch(Exception e)
-            		{
-            			logger_.warn("Unable to close file :" + file);
-            		}
+                    try
+                    {
+                        if (fs != null)
+                        {
+                            fs.close();
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        logger_.warn("Unable to close file :" + file);
+                    }
                 }
             }
         }
@@ -769,39 +800,39 @@
      */
     static Set<List<String>> getCompactionBuckets(List<String> files, long min)
     {
-    	Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
-    	for(String fname : files)
-    	{
-    		File f = new File(fname);
-    		long size = f.length();
+        Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
+        for (String fname : files)
+        {
+            File f = new File(fname);
+            long size = f.length();
 
-    		boolean bFound = false;
+            boolean bFound = false;
             // look for a bucket containing similar-sized files:
             // group in the same bucket if it's w/in 50% of the average for this bucket,
             // or this file and the bucket are all considered "small" (less than `min`)
             for (List<String> bucket : buckets.keySet())
-    		{
+            {
                 long averageSize = buckets.get(bucket);
-                if ((size > averageSize/2 && size < 3*averageSize/2)
-                    || ( size < min && averageSize < min))
-    			{
+                if ((size > averageSize / 2 && size < 3 * averageSize / 2)
+                    || (size < min && averageSize < min))
+                {
                     // remove and re-add because adding changes the hash
                     buckets.remove(bucket);
-    				averageSize = (averageSize + size) / 2 ;
+                    averageSize = (averageSize + size) / 2;
                     bucket.add(fname);
                     buckets.put(bucket, averageSize);
-    				bFound = true;
-    				break;
-    			}
-    		}
+                    bFound = true;
+                    break;
+                }
+            }
             // no similar bucket found; put it in a new one
-    		if(!bFound)
-    		{
+            if (!bFound)
+            {
                 ArrayList<String> bucket = new ArrayList<String>();
                 bucket.add(fname);
                 buckets.put(bucket, size);
-    		}
-    	}
+            }
+        }
 
         return buckets.keySet();
     }
@@ -854,7 +885,7 @@
 
     void doMajorCompaction(long skip)
     {
-    	doMajorCompactionInternal( skip );
+        doMajorCompactionInternal(skip);
     }
 
     /*
@@ -870,23 +901,23 @@
         List<String> files;
         try
         {
-        	 if( skip > 0L )
-        	 {
-        		 files = new ArrayList<String>();
-	        	 for ( String file : filesInternal )
-	        	 {
-	        		 File f = new File(file);
-	        		 if( f.length() < skip*1024L*1024L*1024L )
-	        		 {
-	        			 files.add(file);
-	        		 }
-	        	 }
-        	 }
-        	 else
-        	 {
-        		 files = filesInternal;
-        	 }
-        	 doFileCompaction(files, BUFSIZE);
+            if (skip > 0L)
+            {
+                files = new ArrayList<String>();
+                for (String file : filesInternal)
+                {
+                    File f = new File(file);
+                    if (f.length() < skip * 1024L * 1024L * 1024L)
+                    {
+                        files.add(file);
+                    }
+                }
+            }
+            else
+            {
+                files = filesInternal;
+            }
+            doFileCompaction(files, BUFSIZE);
         }
         catch (IOException ex)
         {
@@ -894,7 +925,7 @@
         }
         finally
         {
-        	isCompacting_.set(false);
+            isCompacting_.set(false);
         }
     }
 
@@ -904,33 +935,33 @@
      */
     long getExpectedCompactedFileSize(List<String> files)
     {
-    	long expectedFileSize = 0;
-    	for(String file : files)
-    	{
-    		File f = new File(file);
-    		long size = f.length();
-    		expectedFileSize = expectedFileSize + size;
-    	}
-    	return expectedFileSize;
+        long expectedFileSize = 0;
+        for (String file : files)
+        {
+            File f = new File(file);
+            long size = f.length();
+            expectedFileSize = expectedFileSize + size;
+        }
+        return expectedFileSize;
     }
 
     /*
      *  Find the maximum size file in the list .
      */
-    String getMaxSizeFile( List<String> files )
+    String getMaxSizeFile(List<String> files)
     {
-    	long maxSize = 0L;
-    	String maxFile = null;
-    	for ( String file : files )
-    	{
-    		File f = new File(file);
-    		if(f.length() > maxSize )
-    		{
-    			maxSize = f.length();
-    			maxFile = file;
-    		}
-    	}
-    	return maxFile;
+        long maxSize = 0L;
+        String maxFile = null;
+        for (String file : files)
+        {
+            File f = new File(file);
+            if (f.length() > maxSize)
+            {
+                maxSize = f.length();
+                maxFile = file;
+            }
+        }
+        return maxFile;
     }
 
     boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
@@ -940,11 +971,11 @@
         boolean result = true;
         try
         {
-        	 result = doFileAntiCompaction(files, ranges, target, fileList, null);
+            result = doFileAntiCompaction(files, ranges, target, fileList, null);
         }
         finally
         {
-        	isCompacting_.set(false);
+            isCompacting_.set(false);
         }
         return result;
 
@@ -952,12 +983,13 @@
 
     void forceCleanup()
     {
-    	MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
+        MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
     }
-    
+
     /**
-     * This function goes over each file and removes the keys that the node is not responsible for 
+     * This function goes over each file and removes the keys that the node is not responsible for
      * and only keeps keys that this node is responsible for.
+     *
      * @throws IOException
      */
     void doCleanupCompaction()
@@ -966,33 +998,37 @@
         List<String> files = new ArrayList<String>(ssTables_);
         try
         {
-            for(String file: files)
+            for (String file : files)
             {
                 doCleanup(file);
             }
         }
         finally
         {
-        	isCompacting_.set(false);
+            isCompacting_.set(false);
         }
     }
+
     /**
      * cleans up one particular file by removing keys that this node is not responsible for.
+     *
      * @param file
      * @throws IOException
      */
     /* TODO: Take care of the comments later. */
     void doCleanup(String file)
     {
-    	if(file == null )
-    		return;
+        if (file == null)
+        {
+            return;
+        }
         List<Range> myRanges;
-    	List<String> files = new ArrayList<String>();
-    	files.add(file);
-    	List<String> newFiles = new ArrayList<String>();
-    	Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
-    	myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
-    	List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
+        List<String> files = new ArrayList<String>();
+        files.add(file);
+        List<String> newFiles = new ArrayList<String>();
+        Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
+        myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
+        List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
         doFileAntiCompaction(files, myRanges, null, newFiles, compactedBloomFilters);
         logger_.debug("Original file : " + file + " of size " + new File(file).length());
         lock_.writeLock().lock();
@@ -1001,9 +1037,9 @@
             ssTables_.remove(file);
             SSTable.removeAssociatedBloomFilter(file);
             for (String newfile : newFiles)
-            {                            	
+            {
                 logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
-                if ( newfile != null )
+                if (newfile != null)
                 {
                     ssTables_.add(newfile);
                     logger_.debug("Inserting bloom filter for file " + newfile);
@@ -1017,10 +1053,11 @@
             lock_.writeLock().unlock();
         }
     }
-    
+
     /**
      * This function is used to do the anti compaction process , it spits out the file which has keys that belong to a given range
      * If the target is not specified it spits out the file as a compacted file with the unecessary ranges wiped out.
+     *
      * @param files
      * @param ranges
      * @param target
@@ -1030,7 +1067,7 @@
      */
     boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters)
     {
-    	boolean result = false;
+        boolean result = false;
         long startTime = System.currentTimeMillis();
         long totalBytesRead = 0;
         long totalBytesWritten = 0;
@@ -1041,185 +1078,191 @@
         IPartitioner p = StorageService.getPartitioner();
         try
         {
-	        // Calculate the expected compacted filesize
-	    	long expectedRangeFileSize = getExpectedCompactedFileSize(files);
-	    	/* in the worst case a node will be giving out alf of its data so we take a chance */
-	    	expectedRangeFileSize = expectedRangeFileSize / 2;
-	        rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
-	        // If the compaction file path is null that means we have no space left for this compaction.
-	        if( rangeFileLocation == null )
-	        {
-	            logger_.warn("Total bytes to be written for range compaction  ..."
-	                    + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
-	            return result;
-	        }
-	        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
-	        if (pq.size() > 0)
-	        {
-	            mergedFileName = getTempFileName();
-	            SSTable ssTableRange = null ;
-	            String lastkey = null;
-	            List<FileStruct> lfs = new ArrayList<FileStruct>();
-	            DataOutputBuffer bufOut = new DataOutputBuffer();
-	            int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
-	            expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
-	            logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
-	            /* Create the bloom filter for the compacted file. */
-	            BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
-	            List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
-
-	            while (pq.size() > 0 || lfs.size() > 0)
-	            {
-	                FileStruct fs = null;
-	                if (pq.size() > 0)
-	                {
-	                    fs = pq.poll();
-	                }
-	                if (fs != null
-	                        && (lastkey == null || lastkey.equals(fs.getKey())))
-	                {
-	                    // The keys are the same so we need to add this to the
-	                    // ldfs list
-	                    lastkey = fs.getKey();
-	                    lfs.add(fs);
-	                }
-	                else
-	                {
-	                    Collections.sort(lfs, new FileStructComparator());
-	                    ColumnFamily columnFamily;
-	                    bufOut.reset();
-	                    if(lfs.size() > 1)
-	                    {
-		                    for (FileStruct filestruct : lfs)
-		                    {
-		                    	try
-		                    	{
-	                                /* read the length although we don't need it */
-	                                filestruct.getBufIn().readInt();
-	                                // Skip the Index
+            // Calculate the expected compacted filesize
+            long expectedRangeFileSize = getExpectedCompactedFileSize(files);
+            /* in the worst case a node will be giving out alf of its data so we take a chance */
+            expectedRangeFileSize = expectedRangeFileSize / 2;
+            rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
+            // If the compaction file path is null that means we have no space left for this compaction.
+            if (rangeFileLocation == null)
+            {
+                logger_.warn("Total bytes to be written for range compaction  ..."
+                             + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
+                return result;
+            }
+            PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
+            if (pq.size() > 0)
+            {
+                mergedFileName = getTempFileName();
+                SSTable ssTableRange = null;
+                String lastkey = null;
+                List<FileStruct> lfs = new ArrayList<FileStruct>();
+                DataOutputBuffer bufOut = new DataOutputBuffer();
+                int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+                expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+                logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+                /* Create the bloom filter for the compacted file. */
+                BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+                List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+                while (pq.size() > 0 || lfs.size() > 0)
+                {
+                    FileStruct fs = null;
+                    if (pq.size() > 0)
+                    {
+                        fs = pq.poll();
+                    }
+                    if (fs != null
+                        && (lastkey == null || lastkey.equals(fs.getKey())))
+                    {
+                        // The keys are the same so we need to add this to the
+                        // ldfs list
+                        lastkey = fs.getKey();
+                        lfs.add(fs);
+                    }
+                    else
+                    {
+                        Collections.sort(lfs, new FileStructComparator());
+                        ColumnFamily columnFamily;
+                        bufOut.reset();
+                        if (lfs.size() > 1)
+                        {
+                            for (FileStruct filestruct : lfs)
+                            {
+                                try
+                                {
+                                    /* read the length although we don't need it */
+                                    filestruct.getBufIn().readInt();
+                                    // Skip the Index
                                     IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
-	                                // We want to add only 2 and resolve them right there in order to save on memory footprint
-	                                if(columnFamilies.size() > 1)
-	                                {
-	    		                        // Now merge the 2 column families
+                                    // We want to add only 2 and resolve them right there in order to save on memory footprint
+                                    if (columnFamilies.size() > 1)
+                                    {
+                                        // Now merge the 2 column families
                                         merge(columnFamilies);
-	                                }
-			                        // deserialize into column families
-			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
-		                    	}
-		                    	catch ( Exception ex)
-		                    	{
+                                    }
+                                    // deserialize into column families
+                                    columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+                                }
+                                catch (Exception ex)
+                                {
                                     logger_.warn(LogUtil.throwableToString(ex));
                                 }
-		                    }
-		                    // Now after merging all crap append to the sstable
-		                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
-		                    columnFamilies.clear();
-		                    if( columnFamily != null )
-		                    {
-			                	/* serialize the cf with column indexes */
-			                    ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
-		                    }
-	                    }
-	                    else
-	                    {
-		                    FileStruct filestruct = lfs.get(0);
-	                    	try
-	                    	{
-		                        /* read the length although we don't need it */
-		                        int size = filestruct.getBufIn().readInt();
-		                        bufOut.write(filestruct.getBufIn(), size);
-	                    	}
-	                    	catch ( Exception ex)
-	                    	{
-	                    		logger_.warn(LogUtil.throwableToString(ex));
-	                            filestruct.close();
-	                            continue;
-	                    	}
-	                    }
+                            }
+                            // Now after merging all crap append to the sstable
+                            columnFamily = resolveAndRemoveDeleted(columnFamilies);
+                            columnFamilies.clear();
+                            if (columnFamily != null)
+                            {
+                                /* serialize the cf with column indexes */
+                                ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+                            }
+                        }
+                        else
+                        {
+                            FileStruct filestruct = lfs.get(0);
+                            try
+                            {
+                                /* read the length although we don't need it */
+                                int size = filestruct.getBufIn().readInt();
+                                bufOut.write(filestruct.getBufIn(), size);
+                            }
+                            catch (Exception ex)
+                            {
+                                logger_.warn(LogUtil.throwableToString(ex));
+                                filestruct.close();
+                                continue;
+                            }
+                        }
                         if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
-	                    {
-	                        if(ssTableRange == null )
-	                        {
-	                        	if( target != null )
-	                        		rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
-	                	        FileUtils.createDirectory(rangeFileLocation);
-	                            ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
-	                        }	                        
-	                        try
-	                        {
-		                        ssTableRange.append(lastkey, bufOut);
-		                        compactedRangeBloomFilter.add(lastkey);
-	                        }
-	                        catch(Exception ex)
-	                        {
-	                            logger_.warn( LogUtil.throwableToString(ex) );
-	                        }
-	                    }
-	                    totalkeysWritten++;
-	                    for (FileStruct filestruct : lfs)
-	                    {
-	                    	try
-	                    	{
+                        {
+                            if (ssTableRange == null)
+                            {
+                                if (target != null)
+                                {
+                                    rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
+                                }
+                                FileUtils.createDirectory(rangeFileLocation);
+                                ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
+                            }
+                            try
+                            {
+                                ssTableRange.append(lastkey, bufOut);
+                                compactedRangeBloomFilter.add(lastkey);
+                            }
+                            catch (Exception ex)
+                            {
+                                logger_.warn(LogUtil.throwableToString(ex));
+                            }
+                        }
+                        totalkeysWritten++;
+                        for (FileStruct filestruct : lfs)
+                        {
+                            try
+                            {
                                 filestruct.advance();
-	                    		if (filestruct.isExhausted())
-	                    		{
-	                    			continue;
-	                    		}
-	                    		/* keep on looping until we find a key in the range */
+                                if (filestruct.isExhausted())
+                                {
+                                    continue;
+                                }
+                                /* keep on looping until we find a key in the range */
                                 while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
-	                            {
+                                {
                                     filestruct.advance();
                                     if (filestruct.isExhausted())
-		                    		{
-		                    			break;
-		                    		}
-	                            }
-	                            if (!filestruct.isExhausted())
-	                            {
-	                            	pq.add(filestruct);
-	                            }
-		                        totalkeysRead++;
-	                    	}
-	                    	catch ( Exception ex )
-	                    	{
-	                    		// Ignore the exception as it might be a corrupted file
-	                    		// in any case we have read as far as possible from it
-	                    		// and it will be deleted after compaction.
+                                    {
+                                        break;
+                                    }
+                                }
+                                if (!filestruct.isExhausted())
+                                {
+                                    pq.add(filestruct);
+                                }
+                                totalkeysRead++;
+                            }
+                            catch (Exception ex)
+                            {
+                                // Ignore the exception as it might be a corrupted file
+                                // in any case we have read as far as possible from it
+                                // and it will be deleted after compaction.
                                 logger_.warn(LogUtil.throwableToString(ex));
-	                            filestruct.close();
+                                filestruct.close();
                             }
-	                    }
-	                    lfs.clear();
-	                    lastkey = null;
-	                    if (fs != null)
-	                    {
-	                        // Add back the fs since we processed the rest of
-	                        // filestructs
-	                        pq.add(fs);
-	                    }
-	                }
-	            }
+                        }
+                        lfs.clear();
+                        lastkey = null;
+                        if (fs != null)
+                        {
+                            // Add back the fs since we processed the rest of
+                            // filestructs
+                            pq.add(fs);
+                        }
+                    }
+                }
 
-	            if( ssTableRange != null )
-	            {
+                if (ssTableRange != null)
+                {
                     ssTableRange.closeRename(compactedRangeBloomFilter);
                     if (fileList != null)
+                    {
                         fileList.add(ssTableRange.getDataFileLocation());
+                    }
                     if (compactedBloomFilters != null)
-                    	compactedBloomFilters.add(compactedRangeBloomFilter);
-	            }
-	        }
+                    {
+                        compactedBloomFilters.add(compactedRangeBloomFilter);
+                    }
+                }
+            }
         }
-        catch ( Exception ex)
+        catch (Exception ex)
         {
-            logger_.error( LogUtil.throwableToString(ex) );
+            logger_.error(LogUtil.throwableToString(ex));
         }
         logger_.debug("Total time taken for range split   ..."
-                + (System.currentTimeMillis() - startTime));
+                      + (System.currentTimeMillis() - startTime));
         logger_.debug("Total bytes Read for range split  ..." + totalBytesRead);
         logger_.debug("Total bytes written for range split  ..."
-                + totalBytesWritten + "   Total keys read ..." + totalkeysRead);
+                      + totalBytesWritten + "   Total keys read ..." + totalkeysRead);
         return result;
     }
 
@@ -1227,28 +1270,28 @@
     {
         bf.add(StorageService.getPartitioner().undecorateKey(decoratedKey));
     }
-    
+
     /*
-     * This function does the actual compaction for files.
-     * It maintains a priority queue of with the first key from each file
-     * and then removes the top of the queue and adds it to the SStable and
-     * repeats this process while reading the next from each file until its
-     * done with all the files . The SStable to which the keys are written
-     * represents the new compacted file. Before writing if there are keys
-     * that occur in multiple files and are the same then a resolution is done
-     * to get the latest data.
-     *
-     */
-    private int doFileCompaction(List<String> files,  int minBufferSize) throws IOException
+    * This function does the actual compaction for files.
+    * It maintains a priority queue of with the first key from each file
+    * and then removes the top of the queue and adds it to the SStable and
+    * repeats this process while reading the next from each file until its
+    * done with all the files . The SStable to which the keys are written
+    * represents the new compacted file. Before writing if there are keys
+    * that occur in multiple files and are the same then a resolution is done
+    * to get the latest data.
+    *
+    */
+    private int doFileCompaction(List<String> files, int minBufferSize) throws IOException
     {
         String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(getExpectedCompactedFileSize(files));
         // If the compaction file path is null that means we have no space left for this compaction.
         // try again w/o the largest one.
-        if( compactionFileLocation == null )
+        if (compactionFileLocation == null)
         {
-            String maxFile = getMaxSizeFile( files );
-            files.remove( maxFile );
-            return doFileCompaction(files , minBufferSize);
+            String maxFile = getMaxSizeFile(files);
+            files.remove(maxFile);
+            return doFileCompaction(files, minBufferSize);
         }
 
         String newfile = null;
@@ -1261,7 +1304,7 @@
 
         if (pq.size() > 0)
         {
-            String mergedFileName = getTempFileName( files );
+            String mergedFileName = getTempFileName(files);
             SSTable ssTable = null;
             String lastkey = null;
             List<FileStruct> lfs = new ArrayList<FileStruct>();
@@ -1281,7 +1324,7 @@
                     fs = pq.poll();
                 }
                 if (fs != null
-                        && (lastkey == null || lastkey.equals(fs.getKey())))
+                    && (lastkey == null || lastkey.equals(fs.getKey())))
                 {
                     // The keys are the same so we need to add this to the
                     // ldfs list
@@ -1293,7 +1336,7 @@
                     Collections.sort(lfs, new FileStructComparator());
                     ColumnFamily columnFamily;
                     bufOut.reset();
-                    if(lfs.size() > 1)
+                    if (lfs.size() > 1)
                     {
                         for (FileStruct filestruct : lfs)
                         {
@@ -1304,14 +1347,14 @@
                                 // Skip the Index
                                 IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
                                 // We want to add only 2 and resolve them right there in order to save on memory footprint
-                                if(columnFamilies.size() > 1)
+                                if (columnFamilies.size() > 1)
                                 {
                                     merge(columnFamilies);
                                 }
                                 // deserialize into column families
                                 columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
                             }
-                            catch ( Exception ex)
+                            catch (Exception ex)
                             {
                                 logger_.warn("error in filecompaction", ex);
                             }
@@ -1319,7 +1362,7 @@
                         // Now after merging all crap append to the sstable
                         columnFamily = resolveAndRemoveDeleted(columnFamilies);
                         columnFamilies.clear();
-                        if( columnFamily != null )
+                        if (columnFamily != null)
                         {
                             /* serialize the cf with column indexes */
                             ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
@@ -1334,7 +1377,7 @@
                             int size = filestruct.getBufIn().readInt();
                             bufOut.write(filestruct.getBufIn(), size);
                         }
-                        catch ( Exception ex)
+                        catch (Exception ex)
                         {
                             logger_.error("empty sstable file " + filestruct.getFileName(), ex);
                             filestruct.close();
@@ -1342,7 +1385,7 @@
                         }
                     }
 
-                    if ( ssTable == null )
+                    if (ssTable == null)
                     {
                         ssTable = new SSTable(compactionFileLocation, mergedFileName, StorageService.getPartitioner());
                     }
@@ -1363,7 +1406,7 @@
                             pq.add(filestruct);
                             totalkeysRead++;
                         }
-                        catch ( Throwable ex )
+                        catch (Throwable ex)
                         {
                             // Ignore the exception as it might be a corrupted file
                             // in any case we have read as far as possible from it
@@ -1380,7 +1423,7 @@
                     }
                 }
             }
-            if ( ssTable != null )
+            if (ssTable != null)
             {
                 ssTable.closeRename(compactedBloomFilter);
                 newfile = ssTable.getDataFileLocation();
@@ -1393,7 +1436,7 @@
                     ssTables_.remove(file);
                     SSTable.removeAssociatedBloomFilter(file);
                 }
-                if ( newfile != null )
+                if (newfile != null)
                 {
                     logger_.debug("Inserting bloom filter for file " + newfile);
                     SSTable.storeBloomFilter(newfile, compactedBloomFilter);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=773728&r1=773727&r2=773728&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Mon May 11 23:52:10 2009
@@ -48,34 +48,34 @@
  * data on disk in sorted fashion. However the sorting is upto
  * the application. This class expects keys to be handed to it
  * in sorted order. SSTable is broken up into blocks where each
- * block contains 128 keys. At the end of the file  the block 
+ * block contains 128 keys. At the end of the file  the block
  * index is written which contains the offsets to the keys in the
- * block. SSTable also maintains an index file to which every 128th 
- * key is written with a pointer to the block index which is the block 
- * that actually contains the key. This index file is then read and 
+ * block. SSTable also maintains an index file to which every 128th
+ * key is written with a pointer to the block index which is the block
+ * that actually contains the key. This index file is then read and
  * maintained in memory. SSTable is append only and immutable. SSTable
  * on disk looks as follows:
- * 
- *                 -------------------------
- *                 |------------------------|<-------|
- *                 |                        |        |  BLOCK-INDEX PTR
- *                 |                        |        |
- *                 |------------------------|--------
- *                 |------------------------|<-------|
- *                 |                        |        |
- *                 |                        |        |  BLOCK-INDEX PTR 
- *                 |                        |        |
- *                 |------------------------|---------
- *                 |------------------------|<--------|
- *                 |                        |         |
- *                 |                        |         |
- *                 |                        |         | BLOCK-INDEX PTR
- *                 |                        |         |
- *                 |------------------------|         |
- *                 |------------------------|----------
- *                 |------------------------|-----------------> BLOOM-FILTER
+ * <p/>
+ * -------------------------
+ * |------------------------|<-------|
+ * |                        |        |  BLOCK-INDEX PTR
+ * |                        |        |
+ * |------------------------|--------
+ * |------------------------|<-------|
+ * |                        |        |
+ * |                        |        |  BLOCK-INDEX PTR
+ * |                        |        |
+ * |------------------------|---------
+ * |------------------------|<--------|
+ * |                        |         |
+ * |                        |         |
+ * |                        |         | BLOCK-INDEX PTR
+ * |                        |         |
+ * |------------------------|         |
+ * |------------------------|----------
+ * |------------------------|-----------------> BLOOM-FILTER
  * version-info <--|----------|-------------|-------> relative offset to last block index.
- *                 
+ * <p/>
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
 
@@ -105,22 +105,22 @@
 
     /**
      * This class holds the position of a key in a block
-     * and the size of the data associated with this key. 
-    */
+     * and the size of the data associated with this key.
+     */
     protected static class BlockMetadata
     {
         protected static final BlockMetadata NULL = new BlockMetadata(-1L, -1L);
-        
+
         long position_;
         long size_;
-        
+
         BlockMetadata(long position, long size)
         {
             position_ = position;
             size_ = size;
         }
     }
-    
+
     /*
      * This abstraction provides LRU symantics for the keys that are 
      * "touched". Currently it holds the offset of the key in a data
@@ -130,24 +130,24 @@
     private static class TouchedKeyCache extends LinkedHashMap<String, Long>
     {
         private final int capacity_;
-        
+
         TouchedKeyCache(int capacity)
         {
             super(capacity + 1, 1.1f, true);
             capacity_ = capacity;
         }
-        
+
         protected boolean removeEldestEntry(Map.Entry<String, Long> entry)
         {
-            return ( size() > capacity_ );
+            return (size() > capacity_);
         }
     }
-    
+
     /**
      * This is a simple container for the index Key and its corresponding position
      * in the data file. Binary search is performed on a list of these objects
      * to lookup keys within the SSTable data file.
-    */
+     */
     public static class KeyPositionInfo implements Comparable<KeyPositionInfo>
     {
         private final String decoratedKey;
@@ -183,30 +183,30 @@
 
         public String toString()
         {
-        	return decoratedKey + ":" + position_;
+            return decoratedKey + ":" + position_;
         }
     }
-    
+
     public static int indexInterval()
     {
-    	return indexInterval_;
+        return indexInterval_;
     }
-    
+
     /*
      * Maintains a list of KeyPositionInfo objects per SSTable file loaded.
      * We do this so that we don't read the index file into memory multiple
      * times.
     */
     static IndexMap indexMetadataMap_ = new IndexMap();
-    
-    /** 
+
+    /**
      * This method deletes both the specified data file
      * and the associated index file
      *
      * @param dataFile - data file associated with the SSTable
-    */
+     */
     public static void delete(String dataFile)
-    {        
+    {
         /* remove the cached index table from memory */
         indexMetadataMap_.remove(dataFile);
         /* Delete the checksum file associated with this data file */
@@ -214,11 +214,11 @@
         {
             ChecksumManager.onFileDelete(dataFile);
         }
-        catch ( IOException ex )
+        catch (IOException ex)
         {
-            logger_.info( LogUtil.throwableToString(ex) );
+            logger_.info(LogUtil.throwableToString(ex));
         }
-        
+
         File file = new File(dataFile);
         assert file.exists();
         /* delete the data file */
@@ -228,39 +228,39 @@
         }
     }
 
-    public static int getApproximateKeyCount( List<String> dataFiles)
+    public static int getApproximateKeyCount(List<String> dataFiles)
     {
-    	int count = 0 ;
+        int count = 0;
 
-    	for(String dataFile : dataFiles )
-    	{    		
-    		List<KeyPositionInfo> index = indexMetadataMap_.get(dataFile);
-    		if (index != null )
-    		{
-    			int indexKeyCount = index.size();
-    			count = count + (indexKeyCount+1) * indexInterval_ ;
-    	        logger_.debug("index size for bloom filter calc for file  : " + dataFile + "   : " + count);
-    		}
-    	}
+        for (String dataFile : dataFiles)
+        {
+            List<KeyPositionInfo> index = indexMetadataMap_.get(dataFile);
+            if (index != null)
+            {
+                int indexKeyCount = index.size();
+                count = count + (indexKeyCount + 1) * indexInterval_;
+                logger_.debug("index size for bloom filter calc for file  : " + dataFile + "   : " + count);
+            }
+        }
 
-    	return count;
+        return count;
     }
 
     /**
      * Get all indexed keys in the SSTable.
-    */
+     */
     public static List<String> getIndexedKeys()
     {
         Set<String> indexFiles = indexMetadataMap_.keySet();
         List<KeyPositionInfo> keyPositionInfos = new ArrayList<KeyPositionInfo>();
 
-        for ( String indexFile : indexFiles )
+        for (String indexFile : indexFiles)
         {
-            keyPositionInfos.addAll( indexMetadataMap_.get(indexFile) );
+            keyPositionInfos.addAll(indexMetadataMap_.get(indexFile));
         }
 
         List<String> indexedKeys = new ArrayList<String>();
-        for ( KeyPositionInfo keyPositionInfo : keyPositionInfos )
+        for (KeyPositionInfo keyPositionInfo : keyPositionInfos)
         {
             indexedKeys.add(keyPositionInfo.decoratedKey);
         }
@@ -268,7 +268,7 @@
         Collections.sort(indexedKeys);
         return indexedKeys;
     }
-    
+
     /*
      * Intialize the index files and also cache the Bloom Filters
      * associated with these files. Also caches the file handles 
@@ -276,14 +276,14 @@
     */
     public static void onStart(List<String> filenames) throws IOException
     {
-        for ( String filename : filenames )
+        for (String filename : filenames)
         {
             SSTable ssTable = null;
             try
             {
                 ssTable = new SSTable(filename, StorageService.getPartitioner());
             }
-            catch ( IOException ex )
+            catch (IOException ex)
             {
                 logger_.info("Deleting corrupted file " + filename);
                 FileUtils.delete(filename);
@@ -291,7 +291,7 @@
             }
             finally
             {
-                if ( ssTable != null )
+                if (ssTable != null)
                 {
                     ssTable.close();
                 }
@@ -323,7 +323,7 @@
     {
         boolean bVal = false;
         BloomFilter bf = bfs_.get(filename);
-        if ( bf != null )
+        if (bf != null)
         {
             bVal = bf.isPresent(clientKey);
         }
@@ -332,20 +332,20 @@
 
     String dataFile_;
     private IFileWriter dataWriter_;
-    private String lastWrittenKey_;    
-    private long firstBlockPosition_ = 0L;    
+    private String lastWrittenKey_;
+    private long firstBlockPosition_ = 0L;
     private int indexKeysWritten_ = 0;
     /* Holds the keys and their respective positions of the current block index */
-    private SortedMap<String, BlockMetadata> blockIndex_;    
+    private SortedMap<String, BlockMetadata> blockIndex_;
     /* Holds all the block indicies for this SSTable */
     private List<SortedMap<String, BlockMetadata>> blockIndexes_;
     private IPartitioner partitioner_;
-    
+
     /**
      * This ctor basically gets passed in the full path name
      * of the data file associated with this SSTable. Use this
      * ctor to read the data in this file.
-    */
+     */
     public SSTable(String dataFileName, IPartitioner partitioner) throws IOException
     {
         dataFile_ = dataFileName;
@@ -356,19 +356,19 @@
     /**
      * This ctor is used for writing data into the SSTable. Use this
      * version for non DB writes to the SSTable.
-    */
+     */
     public SSTable(String directory, String filename, IPartitioner partitioner) throws IOException
     {
         dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";
         partitioner_ = partitioner;
         blockIndex_ = new TreeMap<String, BlockMetadata>(partitioner_.getReverseDecoratedKeyComparator());
         blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
-        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
+        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4 * 1024 * 1024);
         SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition();
-    } 
+    }
 
     private void loadBloomFilter(IFileReader indexReader, long size) throws IOException
-    {        
+    {
         /* read the position of the bloom filter */
         indexReader.seek(size - 8);
         byte[] bytes = new byte[8];
@@ -380,11 +380,11 @@
         DataOutputBuffer bufOut = new DataOutputBuffer();
         DataInputBuffer bufIn = new DataInputBuffer();
         /* read the bloom filter from disk */
-        indexReader.next(bufOut);   
+        indexReader.next(bufOut);
         bufOut.close();
         bufIn.reset(bufOut.getData(), bufOut.getLength());
         String clientKey = bufIn.readUTF();
-        if ( clientKey.equals(SequenceFile.marker_) )
+        if (clientKey.equals(SequenceFile.marker_))
         {
             /*
              * We are now reading the serialized Bloom Filter. We read
@@ -394,20 +394,22 @@
              * need not read the rest of the file.
             */
             bufIn.readInt();
-            if ( bfs_.get(dataFile_) == null )
+            if (bfs_.get(dataFile_) == null)
+            {
                 bfs_.put(dataFile_, BloomFilter.serializer().deserialize(bufIn));
+            }
         }
     }
-    
+
     private void loadIndexFile() throws IOException
-    {    
+    {
         IFileReader indexReader = null;
         /* Read all block indexes to maintain an index in memory */
         try
-        {            
-            indexReader = SequenceFile.bufferedReader(dataFile_, 4*1024*1024);
+        {
+            indexReader = SequenceFile.bufferedReader(dataFile_, 4 * 1024 * 1024);
             long size = indexReader.getEOF();
-            
+
             /* load the bloom filter into memory */
             loadBloomFilter(indexReader, size);
             /* read the position of the last block index */
@@ -417,40 +419,40 @@
             /* the beginning of the first block index */
             long currentPosition = indexReader.getCurrentPosition();
             indexReader.readDirect(bytes);
-            long firstBlockIndexPosition = BasicUtilities.byteArrayToLong(bytes);  
+            long firstBlockIndexPosition = BasicUtilities.byteArrayToLong(bytes);
             List<KeyPositionInfo> keyPositionInfos = new ArrayList<KeyPositionInfo>();
             indexMetadataMap_.put(dataFile_, keyPositionInfos);
             DataOutputBuffer bufOut = new DataOutputBuffer();
-            DataInputBuffer bufIn = new DataInputBuffer();        
-            
+            DataInputBuffer bufIn = new DataInputBuffer();
+
             long nextPosition = currentPosition - firstBlockIndexPosition;
             indexReader.seek(nextPosition);
             /* read the block indexes from the end of the file till we hit the first one. */
-            while ( nextPosition > 0 )
+            while (nextPosition > 0)
             {
                 bufOut.reset();
                 /* position @ the current block index being processed */
                 currentPosition = indexReader.getCurrentPosition();
                 long bytesRead = indexReader.next(bufOut);
-                if ( bytesRead != -1 )
+                if (bytesRead != -1)
                 {
                     bufIn.reset(bufOut.getData(), bufOut.getLength());
                     /* read the block key. */
                     String blockIndexKey = bufIn.readUTF();
-                    if ( !blockIndexKey.equals(SSTable.blockIndexKey_) )
+                    if (!blockIndexKey.equals(SSTable.blockIndexKey_))
                     {
-                    	logger_.debug(" Done reading the block indexes, Index has been created");
-                    	break;
+                        logger_.debug(" Done reading the block indexes, Index has been created");
+                        break;
                     }
                     /* read the size of the block index */
-                    bufIn.readInt();                    
+                    bufIn.readInt();
                     /* Number of keys in the block. */
                     int keys = bufIn.readInt();
                     String largestKeyInBlock;
-                    for ( int i = 0; i < keys; ++i )
+                    for (int i = 0; i < keys; ++i)
                     {
                         String keyInBlock = bufIn.readUTF();
-                        if ( i == 0 )
+                        if (i == 0)
                         {
                             largestKeyInBlock = keyInBlock;
                             /* relative offset in the block for the key*/
@@ -458,7 +460,7 @@
                             /* size of data associated with the key */
                             bufIn.readLong();
                             /* load the actual position of the block index into the index map */
-                            keyPositionInfos.add( new KeyPositionInfo(largestKeyInBlock, partitioner_, currentPosition) );
+                            keyPositionInfos.add(new KeyPositionInfo(largestKeyInBlock, partitioner_, currentPosition));
                         }
                         else
                         {
@@ -476,30 +478,30 @@
             bufIn.close();
             bufOut.close();
             Collections.sort(keyPositionInfos);
-        }        
+        }
         finally
         {
-            if ( indexReader != null )
+            if (indexReader != null)
             {
                 indexReader.close();
             }
-        }        
+        }
     }
 
     private void init() throws IOException
-    {        
+    {
         /*
          * this is to prevent multiple threads from
          * loading the same index files multiple times
          * into memory.
         */
-        synchronized( indexLoadLock_ )
+        synchronized (indexLoadLock_)
         {
-            if ( indexMetadataMap_.get(dataFile_) == null )
+            if (indexMetadataMap_.get(dataFile_) == null)
             {
                 long start = System.currentTimeMillis();
                 loadIndexFile();
-                logger_.debug("INDEX LOAD TIME: " + (System.currentTimeMillis() - start) + " ms.");                
+                logger_.debug("INDEX LOAD TIME: " + (System.currentTimeMillis() - start) + " ms.");
             }
         }
     }
@@ -507,8 +509,10 @@
     private String getFile(String name) throws IOException
     {
         File file = new File(name);
-        if ( file.exists() )
+        if (file.exists())
+        {
             return file.getAbsolutePath();
+        }
         throw new IOException("File " + name + " was not found on disk.");
     }
 
@@ -523,26 +527,28 @@
     public void touch(final String clientKey, boolean fData) throws IOException
     {
         if (touchCache_.containsKey(dataFile_ + ":" + clientKey))
+        {
             return;
-        
-        IFileReader dataReader = SequenceFile.reader(dataFile_); 
+        }
+
+        IFileReader dataReader = SequenceFile.reader(dataFile_);
         try
         {
-        	/* Morph the key */
+            /* Morph the key */
             String decoratedKey = partitioner_.decorateKey(clientKey);
             Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader, partitioner_);
             /* Get offset of key from block Index */
             dataReader.seek(fileCoordinate.end_);
             BlockMetadata blockMetadata = dataReader.getBlockMetadata(decoratedKey);
-            if ( blockMetadata.position_ != -1L )
+            if (blockMetadata.position_ != -1L)
             {
                 touchCache_.put(dataFile_ + ":" + clientKey, blockMetadata.position_);
-            } 
-            
-            if ( fData )
+            }
+
+            if (fData)
             {
                 /* Read the data associated with this key and pull it into the Buffer Cache */
-                if ( blockMetadata.position_ != -1L )
+                if (blockMetadata.position_ != -1L)
                 {
                     dataReader.seek(blockMetadata.position_);
                     DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -554,17 +560,21 @@
         }
         finally
         {
-            if ( dataReader != null )
+            if (dataReader != null)
+            {
                 dataReader.close();
+            }
         }
     }
 
     private long beforeAppend(String decoratedKey) throws IOException
     {
-    	if (decoratedKey == null )
+        if (decoratedKey == null)
+        {
             throw new IOException("Keys must not be null.");
+        }
         Comparator<String> c = partitioner_.getDecoratedKeyComparator();
-        if ( lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) > 0 )
+        if (lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) > 0)
         {
             logger_.info("Last written key : " + lastWrittenKey_);
             logger_.info("Current key : " + decoratedKey);
@@ -579,45 +589,48 @@
         ++indexKeysWritten_;
         lastWrittenKey_ = decoratedKey;
         blockIndex_.put(decoratedKey, new BlockMetadata(position, size));
-        if ( indexKeysWritten_ == indexInterval_ )
+        if (indexKeysWritten_ == indexInterval_)
         {
-        	blockIndexes_.add(blockIndex_);
-        	blockIndex_ = new TreeMap<String, BlockMetadata>(partitioner_.getReverseDecoratedKeyComparator());
+            blockIndexes_.add(blockIndex_);
+            blockIndex_ = new TreeMap<String, BlockMetadata>(partitioner_.getReverseDecoratedKeyComparator());
             indexKeysWritten_ = 0;
-        }                
+        }
     }
 
     /**
      * Dumps all the block indicies for this SSTable
      * at the end of the file.
+     *
      * @throws IOException
      */
     private void dumpBlockIndexes() throws IOException
     {
         firstBlockPosition_ = dataWriter_.getCurrentPosition();
-    	for( SortedMap<String, BlockMetadata> block : blockIndexes_ )
-    	{
-    		dumpBlockIndex( block );
-    	}  	
-    }    
-    
-    private void dumpBlockIndex( SortedMap<String, BlockMetadata> blockIndex) throws IOException
+        for (SortedMap<String, BlockMetadata> block : blockIndexes_)
+        {
+            dumpBlockIndex(block);
+        }
+    }
+
+    private void dumpBlockIndex(SortedMap<String, BlockMetadata> blockIndex) throws IOException
     {
         /* Block Index is empty so bail. */
-        if ( blockIndex.size() == 0 )
+        if (blockIndex.size() == 0)
+        {
             return;
-        
+        }
+
         DataOutputBuffer bufOut = new DataOutputBuffer();
         /* 
          * Record the position where we start writing the block index. This is will be
          * used as the position of the lastWrittenKey in the block in the index file
         */
         long position = dataWriter_.getCurrentPosition();
-        Set<String> keys = blockIndex.keySet();                
+        Set<String> keys = blockIndex.keySet();
         /* Number of keys in this block */
         bufOut.writeInt(keys.size());
-        for ( String decoratedKey : keys )
-        {            
+        for (String decoratedKey : keys)
+        {
             bufOut.writeUTF(decoratedKey);
             BlockMetadata blockMetadata = blockIndex.get(decoratedKey);
             /* position of the key as a relative offset */
@@ -628,14 +641,14 @@
         dataWriter_.append(SSTable.blockIndexKey_, bufOut);
         /* Load this index into the in memory index map */
         List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.get(dataFile_);
-        if ( keyPositionInfos == null )
+        if (keyPositionInfos == null)
         {
-        	keyPositionInfos = new ArrayList<KeyPositionInfo>();
-        	SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
+            keyPositionInfos = new ArrayList<KeyPositionInfo>();
+            SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
         }
-        
+
         keyPositionInfos.add(new KeyPositionInfo(blockIndex.firstKey(), partitioner_, position));
-        blockIndex.clear();        
+        blockIndex.clear();
     }
 
     public void append(String decoratedKey, DataOutputBuffer buffer) throws IOException
@@ -649,7 +662,7 @@
     {
         long currentPosition = beforeAppend(decoratedKey);
         dataWriter_.append(decoratedKey, value);
-        afterAppend(decoratedKey, currentPosition, value.length );
+        afterAppend(decoratedKey, currentPosition, value.length);
     }
 
     /*
@@ -658,24 +671,24 @@
      */
     public static Coordinate getCoordinates(String decoratedKey, IFileReader dataReader, IPartitioner partitioner) throws IOException
     {
-    	List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
+        List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
         assert indexInfo != null && indexInfo.size() > 0;
         long start = 0L;
-    	long end;
+        long end;
         int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(decoratedKey, partitioner));
-        if ( index < 0 )
+        if (index < 0)
         {
             /*
              * We are here which means that the requested
              * key is not an index.
             */
-            index = (++index)*(-1);
+            index = (++index) * (-1);
             /*
              * This means key is not present at all. Hence
              * a scan is in order.
             */
             start = (index == 0) ? 0 : indexInfo.get(index - 1).position();
-            if ( index < indexInfo.size())
+            if (index < indexInfo.size())
             {
                 end = indexInfo.get(index).position();
             }
@@ -698,7 +711,7 @@
         }
         return new Coordinate(start, end);
     }
-    
+
     public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames) throws IOException
     {
         return next(clientKey, cfName, columnNames, null);
@@ -723,9 +736,9 @@
             DataInputBuffer bufIn = new DataInputBuffer();
 
             long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, timeRange, fileCoordinate);
-            if ( bytesRead != -1L )
+            if (bytesRead != -1L)
             {
-                if ( bufOut.getLength() > 0 )
+                if (bufOut.getLength() > 0)
                 {
                     bufIn.reset(bufOut.getData(), bufOut.getLength());
                     /* read the key even though we do not use it */
@@ -754,16 +767,16 @@
 
     public void close() throws IOException
     {
-        close( new byte[0], 0 );
+        close(new byte[0], 0);
     }
 
     public void close(BloomFilter bf) throws IOException
     {
         /* Any remnants in the blockIndex should be added to the dump */
-    	blockIndexes_.add(blockIndex_);
-    	dumpBlockIndexes();
-        
-    	/* reset the buffer and serialize the Bloom Filter. */
+        blockIndexes_.add(blockIndex_);
+        dumpBlockIndexes();
+
+        /* reset the buffer and serialize the Bloom Filter. */
         DataOutputBuffer bufOut = new DataOutputBuffer();
         BloomFilter.serializer().serialize(bf, bufOut);
         close(bufOut.getData(), bufOut.getLength());
@@ -778,15 +791,15 @@
      */
     public void closeRename(BloomFilter bf) throws IOException
     {
-    	close(bf);
+        close(bf);
         String tmpDataFile = dataFile_;
-    	String dataFileName = dataFile_.replace("-" + temporaryFile_,"");    	
-    	File dataFile = new File(dataFile_);
-    	dataFile.renameTo(new File(dataFileName));
+        String dataFileName = dataFile_.replace("-" + temporaryFile_, "");
+        File dataFile = new File(dataFile_);
+        dataFile.renameTo(new File(dataFileName));
         dataFile_ = dataFileName;
         /* Now repair the in memory index associated with the old name */
-    	List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.remove(tmpDataFile);    	    	  	    	
-    	SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
+        List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.remove(tmpDataFile);
+        SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
     }
 
     private void close(byte[] footer, int size) throws IOException
@@ -798,19 +811,19 @@
          * block index and the last one is the position of
          * the Bloom Filter.
          */
-        if ( dataWriter_ != null )
-        {            
+        if (dataWriter_ != null)
+        {
             long bloomFilterPosition = dataWriter_.getCurrentPosition();
             dataWriter_.close(footer, size);
-            /* write the version field into the SSTable */           
+            /* write the version field into the SSTable */
             dataWriter_.writeDirect(BasicUtilities.longToByteArray(version_));
             /* write the relative position of the first block index from current position */
             long blockPosition = dataWriter_.getCurrentPosition() - firstBlockPosition_;
             dataWriter_.writeDirect(BasicUtilities.longToByteArray(blockPosition));
-            
+
             /* write the position of the bloom filter */
             long bloomFilterRelativePosition = dataWriter_.getCurrentPosition() - bloomFilterPosition;
-            dataWriter_.writeDirect(BasicUtilities.longToByteArray(bloomFilterRelativePosition));            
+            dataWriter_.writeDirect(BasicUtilities.longToByteArray(bloomFilterRelativePosition));
             dataWriter_.close();
         }
     }



Mime
View raw message