cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [4/5] clean up ioexceptions patch by Aleksey Yeschenko and jbellis for CASSANDRA-2116
Date Fri, 27 Jul 2012 20:47:19 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index e188636..381b934 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,29 +24,29 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.*;
 
-import org.apache.cassandra.cache.KeyCacheKey;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.index.keys.KeysIndex;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
-import org.apache.cassandra.service.CacheService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.InstrumentingCache;
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.index.keys.KeysIndex;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 
@@ -271,7 +271,7 @@ public class SSTableReader extends SSTable
                                       IndexSummary isummary,
                                       Filter bf,
                                       long maxDataAge,
-                                      SSTableMetadata sstableMetadata) throws IOException
+                                      SSTableMetadata sstableMetadata)
     {
         assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
         return new SSTableReader(desc,
@@ -295,7 +295,6 @@ public class SSTableReader extends SSTable
                           Filter bloomFilter,
                           long maxDataAge,
                           SSTableMetadata sstableMetadata)
-    throws IOException
     {
         super(desc, components, metadata, partitioner);
         this.sstableMetadata = sstableMetadata;
@@ -788,7 +787,7 @@ public class SSTableReader extends SSTable
             catch (IOException e)
             {
                 markSuspect();
-                throw new IOError(e);
+                throw new CorruptSSTableException(e, input.getPath());
             }
             finally
             {
@@ -930,13 +929,13 @@ public class SSTableReader extends SSTable
         return in.readLong();
     }
 
-    public void createLinks(String snapshotDirectoryPath) throws IOException
+    public void createLinks(String snapshotDirectoryPath)
     {
         for (Component component : components)
         {
             File sourceFile = new File(descriptor.filenameFor(component));
             File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
-            CLibrary.createHardLink(sourceFile, targetLink);
+            FileUtils.createHardLink(sourceFile, targetLink);
         }
     }
 
@@ -1046,14 +1045,14 @@ public class SSTableReader extends SSTable
         return sstableMetadata.ancestors;
     }
 
-    public RandomAccessReader openDataReader(boolean skipIOCache) throws IOException
+    public RandomAccessReader openDataReader(boolean skipIOCache)
     {
         return compression
                ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata(), skipIOCache)
                : RandomAccessReader.open(new File(getFilename()), skipIOCache);
     }
 
-    public RandomAccessReader openIndexReader(boolean skipIOCache) throws IOException
+    public RandomAccessReader openIndexReader(boolean skipIOCache)
     {
         return RandomAccessReader.open(new File(getIndexFilename()), skipIOCache);
     }
@@ -1090,14 +1089,7 @@ public class SSTableReader extends SSTable
     {
         for (SSTableReader sstable : sstables)
         {
-            try
-            {
-                sstable.releaseReference();
-            }
-            catch (Exception ex)
-            {
-                logger.error("Failed releasing reference on " + sstable, ex);
-            }
+            sstable.releaseReference();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 94e8522..dafb12b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.IOError;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -52,16 +51,8 @@ public class SSTableScanner implements ICompactionScanner
      */
     SSTableScanner(SSTableReader sstable, boolean skipCache)
     {
-        try
-        {
-            this.dfile = sstable.openDataReader(skipCache);
-            this.ifile = sstable.openIndexReader(skipCache);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new IOError(e);
-        }
+        this.dfile = sstable.openDataReader(skipCache);
+        this.ifile = sstable.openIndexReader(skipCache);
         this.sstable = sstable;
         this.filter = null;
     }
@@ -72,16 +63,8 @@ public class SSTableScanner implements ICompactionScanner
      */
     SSTableScanner(SSTableReader sstable, QueryFilter filter)
     {
-        try
-        {
-            this.dfile = sstable.openDataReader(false);
-            this.ifile = sstable.openIndexReader(false);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new IOError(e);
-        }
+        this.dfile = sstable.openDataReader(false);
+        this.ifile = sstable.openIndexReader(false);
         this.sstable = sstable;
         this.filter = filter;
     }
@@ -126,20 +109,13 @@ public class SSTableScanner implements ICompactionScanner
         catch (IOException e)
         {
             sstable.markSuspect();
-            throw new RuntimeException("corrupt sstable", e);
+            throw new CorruptSSTableException(e, ifile.getPath());
         }
     }
 
     public long getLengthInBytes()
     {
-        try
-        {
-            return dfile.length();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        return dfile.length();
     }
 
     public long getCurrentPosition()
@@ -182,17 +158,9 @@ public class SSTableScanner implements ICompactionScanner
 
         public boolean hasNext()
         {
-            try
-            {
-                if (row == null)
-                    return !dfile.isEOF();
-                return finishedAt < dfile.length();
-            }
-            catch (IOException e)
-            {
-                sstable.markSuspect();
-                throw new RuntimeException(e);
-            }
+            if (row == null)
+                return !dfile.isEOF();
+            return finishedAt < dfile.length();
         }
 
         public OnDiskAtomIterator next()
@@ -215,7 +183,7 @@ public class SSTableScanner implements ICompactionScanner
             catch (IOException e)
             {
                 sstable.markSuspect();
-                throw new RuntimeException(SSTableScanner.this + " failed to provide next columns from " + this, e);
+                throw new CorruptSSTableException(e, dfile.getPath());
             }
         }
 
@@ -238,17 +206,9 @@ public class SSTableScanner implements ICompactionScanner
 
         public boolean hasNext()
         {
-            try
-            {
-                if (row == null)
-                    return !ifile.isEOF();
-                return nextKey != null;
-            }
-            catch (IOException e)
-            {
-                sstable.markSuspect();
-                throw new RuntimeException(e);
-            }
+            if (row == null)
+                return !ifile.isEOF();
+            return nextKey != null;
         }
 
         public OnDiskAtomIterator next()
@@ -286,7 +246,7 @@ public class SSTableScanner implements ICompactionScanner
             catch (IOException e)
             {
                 sstable.markSuspect();
-                throw new RuntimeException(SSTableScanner.this + " failed to provide next columns from " + this, e);
+                throw new CorruptSSTableException(e, ifile.getPath());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 43cd42c..9207276 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -23,8 +23,14 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
+
+import com.google.common.base.Throwables;
+
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.TreeMapBackedSortedColumns;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.compress.CompressionParameters;
@@ -69,7 +75,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
                                        AbstractType<?> comparator,
                                        AbstractType<?> subComparator,
                                        int bufferSizeInMB,
-                                       CompressionParameters compressParameters) throws IOException
+                                       CompressionParameters compressParameters)
     {
         super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator).compressionParameters(compressParameters), partitioner);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
@@ -82,7 +88,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
                                        String columnFamily,
                                        AbstractType<?> comparator,
                                        AbstractType<?> subComparator,
-                                       int bufferSizeInMB) throws IOException
+                                       int bufferSizeInMB)
     {
         this(directory, partitioner, keyspace, columnFamily, comparator, subComparator, bufferSizeInMB, new CompressionParameters(null));
     }
@@ -143,6 +149,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         catch (InterruptedException e)
         {
             throw new RuntimeException(e);
+
         }
         buffer = new Buffer();
         currentSize = 0;
@@ -156,7 +163,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
             if (diskWriter.exception instanceof IOException)
                 throw (IOException) diskWriter.exception;
             else
-                throw new RuntimeException(diskWriter.exception);
+                throw Throwables.propagate(diskWriter.exception);
         }
     }
 
@@ -165,7 +172,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
 
     private class DiskWriter extends Thread
     {
-        volatile Exception exception = null;
+        volatile Throwable exception = null;
 
         public void run()
         {
@@ -184,7 +191,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
                     writer.closeAndOpenReader();
                 }
             }
-            catch (Exception e)
+            catch (Throwable e)
             {
                 if (writer != null)
                     writer.abort();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index e5ad2f9..d0b1b4a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -18,12 +18,12 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.io.IOException;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSError;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -54,19 +54,19 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
                                String keyspace,
                                String columnFamily,
                                AbstractType<?> comparator,
-                               AbstractType<?> subComparator) throws IOException
+                               AbstractType<?> subComparator)
     {
         this(directory,
              new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator), partitioner);
     }
 
-    public SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner) throws IOException
+    public SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner)
     {
         super(directory, metadata, partitioner);
         writer = getWriter();
     }
 
-    public void close() throws IOException
+    public void close()
     {
         try
         {
@@ -74,14 +74,14 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
                 writeRow(currentKey, columnFamily);
             writer.closeAndOpenReader();
         }
-        catch (IOException e)
+        catch (FSError e)
         {
             writer.abort();
             throw e;
         }
     }
 
-    protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
+    protected void writeRow(DecoratedKey key, ColumnFamily columnFamily)
     {
         writer.append(key, columnFamily);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 05f5b25..e7128f4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -22,16 +22,16 @@ import java.util.*;
 import java.util.regex.Pattern;
 
 import com.google.common.collect.Sets;
-
-import org.apache.cassandra.config.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.util.*;
@@ -50,7 +50,7 @@ public class SSTableWriter extends SSTable
     private final SSTableMetadata.Collector sstableMetadataCollector;
     private final TypeSizes typeSizes = TypeSizes.NATIVE;
 
-    public SSTableWriter(String filename, long keyCount) throws IOException
+    public SSTableWriter(String filename, long keyCount)
     {
         this(filename,
              keyCount,
@@ -80,7 +80,7 @@ public class SSTableWriter extends SSTable
                          long keyCount,
                          CFMetaData metadata,
                          IPartitioner<?> partitioner,
-                         SSTableMetadata.Collector sstableMetadataCollector) throws IOException
+                         SSTableMetadata.Collector sstableMetadataCollector)
     {
         super(Descriptor.fromFilename(filename),
               components(metadata),
@@ -114,23 +114,17 @@ public class SSTableWriter extends SSTable
         iwriter.mark();
     }
 
-    public void resetAndTruncate()
+    // NOT necessarily an FS error - not throwing FSWE.
+    public void resetAndTruncate() throws IOException
     {
-        try
-        {
-            dataFile.resetAndTruncate(dataMark);
-            iwriter.resetAndTruncate();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        dataFile.resetAndTruncate(dataMark);
+        iwriter.resetAndTruncate();
     }
 
     /**
      * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
      */
-    private long beforeAppend(DecoratedKey decoratedKey) throws IOException
+    private long beforeAppend(DecoratedKey decoratedKey)
     {
         assert decoratedKey != null : "Keys must not be null";
         if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
@@ -138,7 +132,7 @@ public class SSTableWriter extends SSTable
         return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
     }
 
-    private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index) throws IOException
+    private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index)
     {
         lastWrittenKey = decoratedKey;
         this.last = lastWrittenKey;
@@ -153,61 +147,89 @@ public class SSTableWriter extends SSTable
         return entry;
     }
 
-    public RowIndexEntry append(AbstractCompactedRow row) throws IOException
+    public RowIndexEntry append(AbstractCompactedRow row)
     {
-        long currentPosition = beforeAppend(row.key);
-        ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
-        long dataStart = dataFile.getFilePointer();
-        long dataSize = row.write(dataFile.stream);
-        assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
-                : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
-        sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
-        return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
+        try
+        {
+            long currentPosition = beforeAppend(row.key);
+            ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
+            long dataStart = dataFile.getFilePointer();
+            long dataSize = row.write(dataFile.stream);
+            assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
+                    : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
+            sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
+            return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
+        }
     }
 
-    public void append(DecoratedKey decoratedKey, ColumnFamily cf) throws IOException
+    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
     {
         long startPosition = beforeAppend(decoratedKey);
-        ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
-
-        // Since the columnIndex may insert RangeTombstone marker, computing
-        // the size of the data is tricky.
-        DataOutputBuffer buffer = new DataOutputBuffer();
-
-        // build column index && write columns
-        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, decoratedKey.key, cf.getColumnCount(), buffer);
-        ColumnIndex index = builder.build(cf);
-
-        TypeSizes typeSizes = TypeSizes.NATIVE;
-        long delSize = DeletionTime.serializer.serializedSize(cf.deletionInfo().getTopLevelDeletion(), typeSizes);
-        dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0));
-
-        // Write deletion infos + column count
-        DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream);
-        dataFile.stream.writeInt(builder.writtenAtomCount());
-        dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
-
-        afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
-
-        sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
+        try
+        {
+            ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
+
+            // Since the columnIndex may insert RangeTombstone marker, computing
+            // the size of the data is tricky.
+            DataOutputBuffer buffer = new DataOutputBuffer();
+
+            // build column index && write columns
+            ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, decoratedKey.key, cf.getColumnCount(), buffer);
+            ColumnIndex index = builder.build(cf);
+
+            TypeSizes typeSizes = TypeSizes.NATIVE;
+            long delSize = DeletionTime.serializer.serializedSize(cf.deletionInfo().getTopLevelDeletion(), typeSizes);
+            dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0));
+
+            // Write deletion infos + column count
+            DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream);
+            dataFile.stream.writeInt(builder.writtenAtomCount());
+            dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
+            afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
+            sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
+        }
     }
 
+    /**
+     * @throws IOException if a read from the DataInput fails
+     * @throws FSWriteError if a write to the dataFile fails
+     */
     public long appendFromStream(DecoratedKey key, CFMetaData metadata, long dataSize, DataInput in) throws IOException
     {
         long currentPosition = beforeAppend(key);
-        ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream);
-        long dataStart = dataFile.getFilePointer();
-
-        // write row size
-        dataFile.stream.writeLong(dataSize);
+        long dataStart;
+        try
+        {
+            ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream);
+            dataStart = dataFile.getFilePointer();
+            // write row size
+            dataFile.stream.writeLong(dataSize);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
+        }
 
-        // cf data
         DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(in, descriptor.version);
-        DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream);
-
-        // column size
         int columnCount = in.readInt();
-        dataFile.stream.writeInt(columnCount);
+
+        try
+        {
+            DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream);
+            dataFile.stream.writeInt(columnCount);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
+        }
 
         // deserialize each column to obtain maxTimestamp and immediately serialize it.
         long maxTimestamp = Long.MIN_VALUE;
@@ -245,7 +267,14 @@ public class SSTableWriter extends SSTable
                 tombstones.update(deletionTime);
             }
             maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
-            columnIndexer.add(atom); // This write the atom on disk too
+            try
+            {
+                columnIndexer.add(atom); // This write the atom on disk too
+            }
+            catch (IOException e)
+            {
+                throw new FSWriteError(e, dataFile.getPath());
+            }
         }
 
         assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
@@ -267,30 +296,38 @@ public class SSTableWriter extends SSTable
         FileUtils.closeQuietly(iwriter);
         FileUtils.closeQuietly(dataFile);
 
+        Set<Component> components = SSTable.componentsFor(descriptor);
         try
         {
-            Set<Component> components = SSTable.componentsFor(descriptor);
             if (!components.isEmpty())
                 SSTable.delete(descriptor, components);
         }
-        catch (Exception e)
+        catch (FSWriteError e)
         {
             logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
+            throw e;
         }
     }
 
-    public SSTableReader closeAndOpenReader() throws IOException
+    public SSTableReader closeAndOpenReader()
     {
         return closeAndOpenReader(System.currentTimeMillis());
     }
 
-    public SSTableReader closeAndOpenReader(long maxDataAge) throws IOException
+    public SSTableReader closeAndOpenReader(long maxDataAge)
     {
         // index and filter
         iwriter.close();
 
-        // main data, close will truncate if necessary
-        dataFile.close();
+        try
+        {
+            // main data, close will truncate if necessary
+            dataFile.close();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
+        }
 
         // write sstable statistics
         SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName());
@@ -322,7 +359,7 @@ public class SSTableWriter extends SSTable
         return sstable;
     }
 
-    private void maybeWriteDigest() throws IOException
+    private void maybeWriteDigest()
     {
         byte[] digest = dataFile.digest();
         if (digest == null)
@@ -333,15 +370,29 @@ public class SSTableWriter extends SSTable
         Descriptor newdesc = descriptor.asTemporary(false);
         String[] tmp = newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator));
         String dataFileName = tmp[tmp.length - 1];
-        out.write(String.format("%s  %s", Hex.bytesToHex(digest), dataFileName).getBytes());
-        out.close();
+        try
+        {
+            out.write(String.format("%s  %s", Hex.bytesToHex(digest), dataFileName).getBytes());
+            out.close();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, out.getPath());
+        }
     }
 
-    private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata) throws IOException
+    private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata)
     {
         SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);
-        SSTableMetadata.serializer.serialize(sstableMetadata, out.stream);
-        out.close();
+        try
+        {
+            SSTableMetadata.serializer.serialize(sstableMetadata, out.stream);
+            out.close();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, out.getPath());
+        }
     }
 
     static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
@@ -353,19 +404,16 @@ public class SSTableWriter extends SSTable
 
     public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
     {
-        try
+        for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
         {
-            // do -Data last because -Data present should mean the sstable was completely renamed before crash
-            for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
-                FBUtilities.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
-            FBUtilities.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
-            // rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
-            FBUtilities.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
+            FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
         }
+
+        // do -Data last because -Data present should mean the sstable was completely renamed before crash
+        FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
+
+        // rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
+        FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
     }
 
     public long getFilePointer()
@@ -389,7 +437,7 @@ public class SSTableWriter extends SSTable
         public final Filter bf;
         private FileMark mark;
 
-        IndexWriter(long keyCount) throws IOException
+        IndexWriter(long keyCount)
         {
             indexFile = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_INDEX)),
                                               !DatabaseDescriptor.populateIOCacheOnFlush());
@@ -407,12 +455,20 @@ public class SSTableWriter extends SSTable
                                   : FilterFactory.getFilter(keyCount, fpChance);
         }
 
-        public void append(DecoratedKey key, RowIndexEntry indexEntry) throws IOException
+        public void append(DecoratedKey key, RowIndexEntry indexEntry)
         {
             bf.add(key.key);
             long indexPosition = indexFile.getFilePointer();
-            ByteBufferUtil.writeWithShortLength(key.key, indexFile.stream);
-            RowIndexEntry.serializer.serialize(indexEntry, indexFile.stream);
+            try
+            {
+                ByteBufferUtil.writeWithShortLength(key.key, indexFile.stream);
+                RowIndexEntry.serializer.serialize(indexEntry, indexFile.stream);
+            }
+            catch (IOException e)
+            {
+                throw new FSWriteError(e, indexFile.getPath());
+            }
+
             if (logger.isTraceEnabled())
                 logger.trace("wrote index entry: " + indexEntry + " at " + indexPosition);
 
@@ -423,20 +479,28 @@ public class SSTableWriter extends SSTable
         /**
          * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
          */
-        public void close() throws IOException
+        public void close()
         {
-            // bloom filter
-            FileOutputStream fos = new FileOutputStream(descriptor.filenameFor(SSTable.COMPONENT_FILTER));
-            DataOutputStream stream = new DataOutputStream(fos);
-            FilterFactory.serialize(bf, stream, descriptor.version.filterType);
-            stream.flush();
-            fos.getFD().sync();
-            stream.close();
-
-            // index
-            long position = indexFile.getFilePointer();
-            indexFile.close(); // calls force
-            FileUtils.truncate(indexFile.getPath(), position);
+            String path = descriptor.filenameFor(SSTable.COMPONENT_FILTER);
+            try
+            {
+                // bloom filter
+                FileOutputStream fos = new FileOutputStream(path);
+                DataOutputStream stream = new DataOutputStream(fos);
+                FilterFactory.serialize(bf, stream, descriptor.version.filterType);
+                stream.flush();
+                fos.getFD().sync();
+                stream.close();
+
+                // index
+                long position = indexFile.getFilePointer();
+                indexFile.close(); // calls force
+                FileUtils.truncate(indexFile.getPath(), position);
+            }
+            catch (IOException e)
+            {
+                throw new FSWriteError(e, path);
+            }
 
             // finalize in-memory index state
             summary.complete();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 976f9f7..2c8b89e 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.io.util;
 
 import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
 
 public class BufferedSegmentedFile extends SegmentedFile
 {
@@ -53,16 +51,9 @@ public class BufferedSegmentedFile extends SegmentedFile
 
     public FileDataInput getSegment(long position)
     {
-        try
-        {
-            RandomAccessReader file = RandomAccessReader.open(new File(path));
-            file.seek(position);
-            return file;
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        RandomAccessReader file = RandomAccessReader.open(new File(path));
+        file.seek(position);
+        return file;
     }
 
     public void cleanup()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java b/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
index 2c73084..e01fd91 100644
--- a/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
+++ b/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
@@ -252,7 +252,7 @@ class ColumnIterator implements Iterator<Map.Entry<ByteBuffer, IColumn>>
         }
         catch (IOException e)
         {
-            throw new IOError(e);
+            throw new IOError(e); // can't throw more detailed error. can't rethrow IOException - Iterator interface next().
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 806b114..d82fbae 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -17,9 +17,6 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.IOError;
-import java.io.IOException;
-
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
@@ -57,16 +54,9 @@ public class CompressedSegmentedFile extends SegmentedFile
 
     public FileDataInput getSegment(long position)
     {
-        try
-        {
-            RandomAccessReader file = CompressedRandomAccessReader.open(path, metadata);
-            file.seek(position);
-            return file;
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        RandomAccessReader file = CompressedRandomAccessReader.open(path, metadata);
+        file.seek(position);
+        return file;
     }
 
     public void cleanup()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 354a835..fec0cff 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -25,9 +25,10 @@ import java.util.Comparator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.WrappedRunnable;
-
+import org.apache.cassandra.utils.CLibrary;
 
 public class FileUtils
 {
@@ -39,34 +40,79 @@ public class FileUtils
 
     private static final DecimalFormat df = new DecimalFormat("#.##");
 
-    public static void deleteWithConfirm(String file) throws IOException
+    public static void createHardLink(File from, File to)
+    {
+        if (to.exists())
+            throw new RuntimeException("Tried to create duplicate hard link to " + to);
+        if (!from.exists())
+            throw new RuntimeException("Tried to hard link to file that does not exist " + from);
+
+        try
+        {
+            CLibrary.createHardLink(from, to);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, to);
+        }
+    }
+
+    public static File createTempFile(String prefix, String suffix, File directory)
+    {
+        try
+        {
+            return File.createTempFile(prefix, suffix, directory);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, directory);
+        }
+    }
+
+    public static File createTempFile(String prefix, String suffix)
+    {
+        return createTempFile(prefix, suffix, new File(System.getProperty("java.io.tmpdir")));
+    }
+
+    public static void deleteWithConfirm(String file)
     {
         deleteWithConfirm(new File(file));
     }
 
-    public static void deleteWithConfirm(File file) throws IOException
+    public static void deleteWithConfirm(File file)
     {
         assert file.exists() : "attempted to delete non-existing file " + file.getName();
         if (logger.isDebugEnabled())
             logger.debug("Deleting " + file.getName());
         if (!file.delete())
-        {
-            throw new IOException("Failed to delete " + file.getAbsolutePath());
-        }
+            throw new FSWriteError(new IOException("Failed to delete " + file.getAbsolutePath()), file);
+    }
+
+    public static void renameWithOutConfirm(String from, String to)
+    {
+        new File(from).renameTo(new File(to));
     }
 
-    public static void renameWithConfirm(File from, File to) throws IOException
+    public static void renameWithConfirm(String from, String to)
+    {
+        renameWithConfirm(new File(from), new File(to));
+    }
+
+    public static void renameWithConfirm(File from, File to)
     {
         assert from.exists();
         if (logger.isDebugEnabled())
             logger.debug((String.format("Renaming %s to %s", from.getPath(), to.getPath())));
+        // this is not FSWE because usually when we see it it's because we didn't close the file before renaming it,
+        // and Windows is picky about that.
         if (!from.renameTo(to))
-            throw new IOException(String.format("Failed to rename %s to %s", from.getPath(), to.getPath()));
+            throw new RuntimeException(String.format("Failed to rename %s to %s", from.getPath(), to.getPath()));
     }
 
-    public static void truncate(String path, long size) throws IOException
+    public static void truncate(String path, long size)
     {
         RandomAccessFile file;
+
         try
         {
             file = new RandomAccessFile(path, "rw");
@@ -75,13 +121,18 @@ public class FileUtils
         {
             throw new RuntimeException(e);
         }
+
         try
         {
             file.getChannel().truncate(size);
         }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, path);
+        }
         finally
         {
-            file.close();
+            closeQuietly(file);
         }
     }
 
@@ -123,6 +174,30 @@ public class FileUtils
             throw e;
     }
 
+    public static String getCanonicalPath(String filename)
+    {
+        try
+        {
+            return new File(filename).getCanonicalPath();
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, filename);
+        }
+    }
+
+    public static String getCanonicalPath(File file)
+    {
+        try
+        {
+            return file.getCanonicalPath();
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, file);
+        }
+    }
+
     public static class FileComparator implements Comparator<File>
     {
         public int compare(File f, File f2)
@@ -131,19 +206,17 @@ public class FileUtils
         }
     }
 
-    public static void createDirectory(String directory) throws IOException
+    public static void createDirectory(String directory)
     {
         createDirectory(new File(directory));
     }
 
-    public static void createDirectory(File directory) throws IOException
+    public static void createDirectory(File directory)
     {
         if (!directory.exists())
         {
             if (!directory.mkdirs())
-            {
-                throw new IOException("unable to mkdirs " + directory);
-            }
+                throw new FSWriteError(new IOException("Failed to mkdirs " + directory), directory);
         }
     }
 
@@ -163,9 +236,9 @@ public class FileUtils
 
     public static void deleteAsync(final String file)
     {
-        Runnable runnable = new WrappedRunnable()
+        Runnable runnable = new Runnable()
         {
-            protected void runMayThrow() throws IOException
+            public void run()
             {
                 deleteWithConfirm(new File(file));
             }
@@ -210,9 +283,9 @@ public class FileUtils
     /**
      * Deletes all files and subdirectories under "dir".
      * @param dir Directory to be deleted
-     * @throws IOException if any part of the tree cannot be deleted
+     * @throws FSWriteError if any part of the tree cannot be deleted
      */
-    public static void deleteRecursive(File dir) throws IOException
+    public static void deleteRecursive(File dir)
     {
         if (dir.isDirectory())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index ae81a08..f3620e3 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -17,12 +17,7 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.lang.reflect.Method;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
@@ -33,6 +28,8 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.io.FSReadError;
+
 public class MmappedSegmentedFile extends SegmentedFile
 {
     private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
@@ -83,17 +80,10 @@ public class MmappedSegmentedFile extends SegmentedFile
         }
 
         // not mmap'd: open a braf covering the segment
-        try
-        {
-            // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
-            RandomAccessReader file = RandomAccessReader.open(new File(path));
-            file.seek(position);
-            return file;
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
+        RandomAccessReader file = RandomAccessReader.open(new File(path));
+        file.seek(position);
+        return file;
     }
 
     public static void initCleaner()
@@ -205,10 +195,19 @@ public class MmappedSegmentedFile extends SegmentedFile
         {
             int segcount = boundaries.size() - 1;
             Segment[] segments = new Segment[segcount];
-            RandomAccessFile raf = null;
+            RandomAccessFile raf;
+
             try
             {
                 raf = new RandomAccessFile(path, "r");
+            }
+            catch (FileNotFoundException e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            try
+            {
                 for (int i = 0; i < segcount; i++)
                 {
                     long start = boundaries.get(i);
@@ -221,7 +220,7 @@ public class MmappedSegmentedFile extends SegmentedFile
             }
             catch (IOException e)
             {
-                throw new IOError(e);
+                throw new FSReadError(e, path);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index c7ed792..ba7587f 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -17,14 +17,11 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.utils.CLibrary;
 
 public class RandomAccessReader extends RandomAccessFile implements FileDataInput
@@ -61,7 +58,8 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
 
     private final long fileLength;
 
-    public RandomAccessReader(File file, int bufferSize, boolean skipIOCache) throws IOException
+    // used in tests
+    public RandomAccessReader(File file, int bufferSize, boolean skipIOCache) throws FileNotFoundException
     {
         super(file, "r");
 
@@ -74,66 +72,93 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         buffer = new byte[bufferSize];
 
         this.skipIOCache = skipIOCache;
-        fd = CLibrary.getfd(getFD());
+        try
+        {
+            fd = CLibrary.getfd(getFD());
+        }
+        catch (IOException e)
+        {
+            // fd == null, Not Supposed To Happen
+            throw new RuntimeException(e);
+        }
 
         // we can cache file length in read-only mode
-        fileLength = channel.size();
+        try
+        {
+            fileLength = channel.size();
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, filePath);
+        }
         validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations
     }
 
-    public static RandomAccessReader open(File file, boolean skipIOCache) throws IOException
+    public static RandomAccessReader open(File file, boolean skipIOCache)
     {
         return open(file, DEFAULT_BUFFER_SIZE, skipIOCache);
     }
 
-    public static RandomAccessReader open(File file) throws IOException
+    public static RandomAccessReader open(File file)
     {
         return open(file, DEFAULT_BUFFER_SIZE, false);
     }
 
-    public static RandomAccessReader open(File file, int bufferSize) throws IOException
+    public static RandomAccessReader open(File file, int bufferSize)
     {
         return open(file, bufferSize, false);
     }
 
-    public static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache) throws IOException
+    public static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache)
     {
-        return new RandomAccessReader(file, bufferSize, skipIOCache);
+        try
+        {
+            return new RandomAccessReader(file, bufferSize, skipIOCache);
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     // convert open into open
-    public static RandomAccessReader open(SequentialWriter writer) throws IOException
+    public static RandomAccessReader open(SequentialWriter writer)
     {
         return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE);
     }
 
     /**
      * Read data from file starting from current currentOffset to populate buffer.
-     * @throws IOException on any I/O error.
      */
-    protected void reBuffer() throws IOException
+    protected void reBuffer()
     {
         resetBuffer();
 
-        if (bufferOffset >= channel.size())
-            return;
+        try
+        {
+            if (bufferOffset >= channel.size())
+                return;
+
+            channel.position(bufferOffset); // setting channel position
 
-        channel.position(bufferOffset); // setting channel position
+            int read = 0;
 
-        int read = 0;
+            while (read < buffer.length)
+            {
+                int n = super.read(buffer, read, buffer.length - read);
+                if (n < 0)
+                    break;
+                read += n;
+            }
 
-        while (read < buffer.length)
+            validBufferBytes = read;
+            bytesSinceCacheFlush += read;
+        }
+        catch (IOException e)
         {
-            int n = super.read(buffer, read, buffer.length - read);
-            if (n < 0)
-                break;
-            read += n;
+            throw new FSReadError(e, filePath);
         }
 
-        validBufferBytes = read;
-
-        bytesSinceCacheFlush += read;
-
         if (skipIOCache && bytesSinceCacheFlush >= CACHE_FLUSH_INTERVAL_IN_BYTES)
         {
             // with random I/O we can't control what we are skipping so
@@ -155,7 +180,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return filePath;
     }
 
-    public void reset() throws IOException
+    public void reset()
     {
         seek(markedPointer);
     }
@@ -173,7 +198,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return new BufferedRandomAccessFileMark(markedPointer);
     }
 
-    public void reset(FileMark mark) throws IOException
+    public void reset(FileMark mark)
     {
         assert mark instanceof BufferedRandomAccessFileMark;
         seek(((BufferedRandomAccessFileMark) mark).pointer);
@@ -189,14 +214,13 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
 
     /**
      * @return true if there is no more data to read
-     * @throws IOException on any I/O error.
      */
-    public boolean isEOF() throws IOException
+    public boolean isEOF()
     {
         return getFilePointer() == length();
     }
 
-    public long bytesRemaining() throws IOException
+    public long bytesRemaining()
     {
         return length() - getFilePointer();
     }
@@ -213,14 +237,21 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     }
 
     @Override
-    public void close() throws IOException
+    public void close()
     {
         buffer = null;
 
         if (skipIOCache && bytesSinceCacheFlush > 0)
             CLibrary.trySkipCache(fd, 0, 0);
 
-        super.close();
+        try
+        {
+            super.close();
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, filePath);
+        }
     }
 
     @Override
@@ -243,14 +274,14 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     }
 
     @Override
-    public void seek(long newPosition) throws IOException
+    public void seek(long newPosition)
     {
         if (newPosition < 0)
             throw new IllegalArgumentException("new position should not be negative");
 
         if (newPosition > length()) // it is save to call length() in read-only mode
-            throw new EOFException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
-                                                 newPosition, getPath(), length()));
+            throw new IllegalArgumentException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
+                                                             newPosition, getPath(), length()));
 
         current = newPosition;
 
@@ -261,10 +292,10 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     @Override
     // -1 will be returned if there is nothing to read; higher-level methods like readInt
     // or readFully (from RandomAccessFile) will throw EOFException but this should not
-    public int read() throws IOException
+    public int read()
     {
         if (buffer == null)
-            throw new ClosedChannelException();
+            throw new AssertionError("Attempted to read from closed RAR");
 
         if (isEOF())
             return -1; // required by RandomAccessFile
@@ -278,7 +309,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     }
 
     @Override
-    public int read(byte[] buffer) throws IOException
+    public int read(byte[] buffer)
     {
         return read(buffer, 0, buffer.length);
     }
@@ -286,10 +317,10 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     @Override
     // -1 will be returned if there is nothing to read; higher-level methods like readInt
     // or readFully (from RandomAccessFile) will throw EOFException but this should not
-    public int read(byte[] buff, int offset, int length) throws IOException
+    public int read(byte[] buff, int offset, int length)
     {
         if (buffer == null)
-            throw new ClosedChannelException();
+            throw new AssertionError("Attempted to read from closed RAR");
 
         if (length == 0)
             return 0;
@@ -315,36 +346,48 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return toCopy;
     }
 
-    public ByteBuffer readBytes(int length) throws IOException
+    public ByteBuffer readBytes(int length) throws EOFException
     {
         assert length >= 0 : "buffer length should not be negative: " + length;
 
         byte[] buff = new byte[length];
-        readFully(buff); // reading data buffer
+
+        try
+        {
+            readFully(buff); // reading data buffer
+        }
+        catch (EOFException e)
+        {
+            throw e;
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, filePath);
+        }
 
         return ByteBuffer.wrap(buff);
     }
 
     @Override
-    public long length() throws IOException
+    public long length()
     {
         return fileLength;
     }
 
     @Override
-    public void write(int value) throws IOException
+    public void write(int value)
     {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public void write(byte[] buffer) throws IOException
+    public void write(byte[] buffer)
     {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public void write(byte[] buffer, int offset, int length) throws IOException
+    public void write(byte[] buffer, int offset, int length)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 03de78b..1dfc1bc 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.util;
 
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.IOError;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.util.Iterator;
@@ -27,6 +26,7 @@ import java.util.NoSuchElementException;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -164,7 +164,7 @@ public abstract class SegmentedFile
             }
             catch (IOException e)
             {
-                throw new IOError(e);
+                throw new FSReadError(e, path);
             }
             return segment;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 8b78730..a80c687 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.utils.CLibrary;
 
 public class SequentialWriter extends OutputStream
@@ -60,9 +61,16 @@ public class SequentialWriter extends OutputStream
     public final DataOutputStream stream;
     private MessageDigest digest;
 
-    public SequentialWriter(File file, int bufferSize, boolean skipIOCache) throws IOException
+    public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
     {
-        out = new RandomAccessFile(file, "rw");
+        try
+        {
+            out = new RandomAccessFile(file, "rw");
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
 
         filePath = file.getAbsolutePath();
 
@@ -70,22 +78,31 @@ public class SequentialWriter extends OutputStream
         this.skipIOCache = skipIOCache;
         this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
         this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
-        fd = CLibrary.getfd(out.getFD());
+
+        try
+        {
+            fd = CLibrary.getfd(out.getFD());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e); // shouldn't happen
+        }
+
         directoryFD = CLibrary.tryOpenDirectory(file.getParent());
         stream = new DataOutputStream(this);
     }
 
-    public static SequentialWriter open(File file) throws IOException
+    public static SequentialWriter open(File file)
     {
         return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
     }
 
-    public static SequentialWriter open(File file, boolean skipIOCache) throws IOException
+    public static SequentialWriter open(File file, boolean skipIOCache)
     {
         return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache);
     }
 
-    public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache) throws IOException
+    public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache)
     {
         return new SequentialWriter(file, bufferSize, skipIOCache);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 7966605..1b0ae05 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -24,12 +24,11 @@ import java.net.Socket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.xerial.snappy.SnappyInputStream;
-
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.streaming.IncomingStreamReader;
 import org.apache.cassandra.streaming.StreamHeader;
+import org.xerial.snappy.SnappyInputStream;
 
 public class IncomingTcpConnection extends Thread
 {
@@ -76,7 +75,7 @@ public class IncomingTcpConnection extends Thread
         }
         catch (IOException e)
         {
-            logger.debug("IOError reading from socket; closing", e);
+            logger.debug("IOException reading from socket; closing", e);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index 62316c8..2d51590 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.service;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.SnapshotCommand;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IVerbHandler;
@@ -24,28 +27,19 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
 {
     private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
+
     public void doVerb(MessageIn<SnapshotCommand> message, String id)
     {
-        try
-        {
-            SnapshotCommand command = message.payload;
-            if (command.clear_snapshot)
-                Table.open(command.keyspace).clearSnapshot(command.snapshot_name);
-            else
-                Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
-            if (logger.isDebugEnabled())
-                logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.from);
-            MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.from);
-        }
-        catch (Exception ex)
-        {
-            throw new RuntimeException(ex);
-        }
+        SnapshotCommand command = message.payload;
+        if (command.clear_snapshot)
+            Table.open(command.keyspace).clearSnapshot(command.snapshot_name);
+        else
+            Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
+        if (logger.isDebugEnabled())
+            logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.from);
+        MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4af399d..9246716 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.io.File;
-import java.io.IOError;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
@@ -354,9 +353,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         {
             Thread.sleep(delay);
         }
-        catch (Exception ex)
+        catch (InterruptedException e)
         {
-            throw new IOError(ex);
+            throw new AssertionError(e);
         }
 
         Schema.instance.updateVersionAndAnnounce();
@@ -703,7 +702,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
     }
 
-    public synchronized void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException
+    public synchronized void joinRing() throws IOException, ConfigurationException
     {
         if (!joined)
         {
@@ -1846,34 +1845,22 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         return stringify(Gossiper.instance.getUnreachableMembers());
     }
 
-    private static String getCanonicalPath(String filename)
-    {
-        try
-        {
-            return new File(filename).getCanonicalPath();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-    }
-
     public String[] getAllDataFileLocations()
     {
         String[] locations = DatabaseDescriptor.getAllDataFileLocations();
         for (int i = 0; i < locations.length; i++)
-            locations[i] = getCanonicalPath(locations[i]);
+            locations[i] = FileUtils.getCanonicalPath(locations[i]);
         return locations;
     }
 
     public String getCommitLogLocation()
     {
-        return getCanonicalPath(DatabaseDescriptor.getCommitLogLocation());
+        return FileUtils.getCanonicalPath(DatabaseDescriptor.getCommitLogLocation());
     }
 
     public String getSavedCachesLocation()
     {
-        return getCanonicalPath(DatabaseDescriptor.getSavedCachesLocation());
+        return FileUtils.getCanonicalPath(DatabaseDescriptor.getSavedCachesLocation());
     }
 
     private List<String> stringify(Iterable<InetAddress> endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 075c24c..43d053e 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Collections;
 
-import com.ning.compress.lzf.LZFInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,14 +34,15 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.PrecompactedRow;
 import org.apache.cassandra.io.IColumnSerializer;
-import org.apache.cassandra.streaming.compress.CompressedInputStream;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.compress.CompressedInputStream;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import com.ning.compress.lzf.LZFInputStream;
 
 public class IncomingStreamReader
 {
@@ -82,6 +82,9 @@ public class IncomingStreamReader
         }
     }
 
+    /**
+     * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+     */
     public void read() throws IOException
     {
         if (remoteFile != null)
@@ -111,6 +114,9 @@ public class IncomingStreamReader
         session.closeIfFinished();
     }
 
+    /**
+     * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+     */
     private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
     {
         ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
@@ -139,7 +145,7 @@ public class IncomingStreamReader
                     {
                         // need to update row cache
                         // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
-                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
+                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, localFile.desc.baseFilename(), key, 0, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
                         PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
                         // We don't expire anything so the row shouldn't be empty
                         assert !row.isEmpty();
@@ -164,7 +170,7 @@ public class IncomingStreamReader
             }
             return writer.closeAndOpenReader();
         }
-        catch (Exception e)
+        catch (Throwable e)
         {
             writer.abort();
             if (e instanceof IOException)
@@ -174,7 +180,7 @@ public class IncomingStreamReader
         }
     }
 
-    private void retry() throws IOException
+    private void retry()
     {
         /* Ask the source node to re-stream this file. */
         session.retry(remoteFile);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index e11838c..2c812e5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -27,8 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.MessagingService;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +37,8 @@ import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.OutboundTcpConnection;
 import org.apache.cassandra.utils.Pair;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 /** each context gets its own StreamInSession. So there may be >1 Session per host */
 public class StreamInSession extends AbstractStreamSession
@@ -142,7 +142,7 @@ public class StreamInSession extends AbstractStreamSession
         logger.debug("ack {} sent for {}", reply, remoteFile);
     }
 
-    public void retry(PendingFile remoteFile) throws IOException
+    public void retry(PendingFile remoteFile)
     {
         retries++;
         if (retries > DatabaseDescriptor.getMaxStreamingRetries())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index aa36958..2ade0c6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.IOError;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.Future;
@@ -90,11 +88,10 @@ public class StreamOut
      * Flushes matching column families from the given keyspace, or all columnFamilies
      * if the cf list is empty.
      */
-    private static void flushSSTables(Iterable<ColumnFamilyStore> stores) throws IOException
+    private static void flushSSTables(Iterable<ColumnFamilyStore> stores)
     {
         logger.info("Flushing memtables for {}...", stores);
-        List<Future<?>> flushes;
-        flushes = new ArrayList<Future<?>>();
+        List<Future<?>> flushes = new ArrayList<Future<?>>();
         for (ColumnFamilyStore cfstore : stores)
         {
             Future<?> flush = cfstore.forceFlush();
@@ -110,28 +107,20 @@ public class StreamOut
     public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, OperationType type)
     {
         assert ranges.size() > 0;
-
         logger.info("Beginning transfer to {}", session.getHost());
         logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
-        try
-        {
-            flushSSTables(cfses);
-            Iterable<SSTableReader> sstables = Collections.emptyList();
-            for (ColumnFamilyStore cfStore : cfses)
-                sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced());
-            transferSSTables(session, sstables, ranges, type);
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        flushSSTables(cfses);
+        Iterable<SSTableReader> sstables = Collections.emptyList();
+        for (ColumnFamilyStore cfStore : cfses)
+            sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced());
+        transferSSTables(session, sstables, ranges, type);
     }
 
     /**
      * Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint.
      * You should probably call transferRanges instead. This moreover assumes that references have been acquired on the sstables.
      */
-    public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, OperationType type) throws IOException
+    public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, OperationType type)
     {
         List<PendingFile> pending = createPendingFiles(sstables, ranges, type);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index e695df0..504c15d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -17,19 +17,18 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.StringUtils;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
  * This class manages the streaming of multiple files one after the other.
@@ -115,7 +114,7 @@ public class StreamOutSession extends AbstractStreamSession
         MessagingService.instance().stream(new StreamHeader(table, getSessionId(), pf), getHost());
     }
 
-    public void startNext() throws IOException
+    public void startNext()
     {
         assert files.containsKey(currentFile);
         files.get(currentFile).sstable.releaseReference();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index 1c5ec4b..714f76a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -17,10 +17,6 @@
  */
 package org.apache.cassandra.streaming;
 
-
-import java.io.IOError;
-import java.io.IOException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,42 +29,35 @@ public class StreamReplyVerbHandler implements IVerbHandler<StreamReply>
 
     public void doVerb(MessageIn<StreamReply> message, String id)
     {
-        try
+        StreamReply reply = message.payload;
+        logger.debug("Received StreamReply {}", reply);
+        StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId);
+        if (session == null)
         {
-            StreamReply reply = message.payload;
-            logger.debug("Received StreamReply {}", reply);
-            StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId);
-            if (session == null)
-            {
-                logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);
-                return;
-            }
-
-            switch (reply.action)
-            {
-                case FILE_FINISHED:
-                    logger.info("Successfully sent {} to {}", reply.file, message.from);
-                    session.validateCurrentFile(reply.file);
-                    session.startNext();
-                    break;
-                case FILE_RETRY:
-                    session.validateCurrentFile(reply.file);
-                    logger.info("Need to re-stream file {} to {}", reply.file, message.from);
-                    session.retry();
-                    break;
-                case SESSION_FINISHED:
-                    session.close(true);
-                    break;
-                case SESSION_FAILURE:
-                    session.close(false);
-                    break;
-                default:
-                    throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);
-            }
+            logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);
+            return;
         }
-        catch (IOException ex)
+
+        switch (reply.action)
         {
-            throw new IOError(ex);
+            case FILE_FINISHED:
+                logger.info("Successfully sent {} to {}", reply.file, message.from);
+                session.validateCurrentFile(reply.file);
+                session.startNext();
+                break;
+            case FILE_RETRY:
+                session.validateCurrentFile(reply.file);
+                logger.info("Need to re-stream file {} to {}", reply.file, message.from);
+                session.retry();
+                break;
+            case SESSION_FINISHED:
+                session.close(true);
+                break;
+            case SESSION_FAILURE:
+                session.close(false);
+                break;
+            default:
+                throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index bff8966..c63d119 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -148,17 +148,9 @@ public class StreamingRepairTask implements Runnable
             public void onSuccess()
             {
                 if (outstanding.decrementAndGet() > 0)
-                    // waiting on more calls
-                    return;
+                    return; // waiting on more calls
 
-                try
-                {
-                    StreamingRepairResponse.reply(taskOwner, taskId);
-                }
-                catch (IOException e)
-                {
-                    throw new IOError(e);
-                }
+                StreamingRepairResponse.reply(taskOwner, taskId);
             }
 
             public void onFailure() {}
@@ -222,7 +214,7 @@ public class StreamingRepairTask implements Runnable
                 task.callback.onSuccess();
         }
 
-        private static void reply(InetAddress remote, UUID taskid) throws IOException
+        private static void reply(InetAddress remote, UUID taskid)
         {
             logger.info(String.format("[streaming task #%s] task suceed, forwarding response to %s", taskid, remote));
             MessageOut<UUID> message = new MessageOut<UUID>(MessagingService.Verb.STREAMING_REPAIR_RESPONSE, taskid, UUIDGen.serializer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 8fbaea1..8eadee8 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -89,7 +89,7 @@ public class CompressedInputStream extends InputStream
         return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
     }
 
-    public void position(long position) throws IOException
+    public void position(long position)
     {
         assert position >= current : "stream can only read forward.";
         current = position;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index c606ef5..a13440e 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -335,19 +335,12 @@ public class ByteBufferUtil
         }
     }
 
-    public static void writeWithShortLength(ByteBuffer buffer, DataOutput out)
+    public static void writeWithShortLength(ByteBuffer buffer, DataOutput out) throws IOException
     {
         int length = buffer.remaining();
         assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length;
-        try
-        {
-            out.writeShort(length);
-            write(buffer, out); // writing data bytes to output source
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
+        out.writeShort(length);
+        write(buffer, out); // writing data bytes to output source
     }
 
     public static ByteBuffer readWithLength(DataInput in) throws IOException
@@ -455,7 +448,7 @@ public class ByteBufferUtil
 
         return new InputStream()
         {
-            public int read() throws IOException
+            public int read()
             {
                 if (!copy.hasRemaining())
                     return -1;
@@ -464,7 +457,7 @@ public class ByteBufferUtil
             }
 
             @Override
-            public int read(byte[] bytes, int off, int len) throws IOException
+            public int read(byte[] bytes, int off, int len)
             {
                 if (!copy.hasRemaining())
                     return -1;
@@ -475,7 +468,7 @@ public class ByteBufferUtil
             }
 
             @Override
-            public int available() throws IOException
+            public int available()
             {
                 return copy.remaining();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debb15ed/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 15e493f..38bf47b 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -227,19 +227,6 @@ public class FBUtilities
         return messageDigest.digest();
     }
 
-    public static void renameWithConfirm(String tmpFilename, String filename) throws IOException
-    {
-        if (!new File(tmpFilename).renameTo(new File(filename)))
-        {
-            throw new IOException("rename failed of " + filename);
-        }
-    }
-
-    public static void renameWithOutConfirm(String tmpFilename, String filename) throws IOException
-    {
-        new File(tmpFilename).renameTo(new File(filename));
-    }
-
     @Deprecated
     public static void serialize(TSerializer serializer, TBase struct, DataOutput out)
     throws IOException
@@ -611,7 +598,7 @@ public class FBUtilities
         return buffer.getData();
     }
 
-    public static RuntimeException unchecked(Exception e)
+    public static RuntimeException unchecked(Throwable e)
     {
         return e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e);
     }


Mime
View raw message