cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eev...@apache.org
Subject svn commit: r915463 [2/3] - in /incubator/cassandra/branches/cassandra-0.6: contrib/mutex/ contrib/word_count/src/ src/java/org/apache/cassandra/auth/ src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandr...
Date Tue, 23 Feb 2010 18:13:09 GMT
Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Tue Feb 23 18:13:07 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.hadoop;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -226,4 +247,4 @@
     {
         return new ColumnFamilyRecordReader();
     }
-}
\ No newline at end of file
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Feb 23 18:13:07 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.hadoop;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -188,4 +209,4 @@
     {
         return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
     }
-}
\ No newline at end of file
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Tue Feb 23 18:13:07 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.hadoop;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -132,4 +153,4 @@
         w.readFields(in);
         return w;
     }
-}
\ No newline at end of file
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/DeletionService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/DeletionService.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/DeletionService.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/DeletionService.java Tue Feb 23 18:13:07 2010
@@ -1,67 +1,88 @@
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class DeletionService
-{
-    public static final int MAX_RETRIES = 10;
-
-    public static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
-
-    public static void submitDelete(final String file)
-    {
-        Runnable deleter = new WrappedRunnable()
-        {
-            @Override
-            protected void runMayThrow() throws IOException
-            {
-                FileUtils.deleteWithConfirm(new File(file));
-            }
-        };
-        executor.submit(deleter);
-    }
-
-    public static void submitDeleteWithRetry(String file)
-    {
-        submitDeleteWithRetry(file, 0);
-    }
-
-    private static void submitDeleteWithRetry(final String file, final int retryCount)
-    {
-        Runnable deleter = new WrappedRunnable()
-        {
-            @Override
-            protected void runMayThrow() throws IOException
-            {
-                if (!new File(file).delete())
-                {
-                    if (retryCount > MAX_RETRIES)
-                        throw new IOException("Unable to delete " + file + " after " + MAX_RETRIES + " tries");
-                    new Thread(new Runnable()
-                    {
-                        public void run()
-                        {
-                            try
-                            {
-                                Thread.sleep(10000);
-                            }
-                            catch (InterruptedException e)
-                            {
-                                throw new AssertionError(e);
-                            }
-                            submitDeleteWithRetry(file, retryCount + 1);
-                        }
-                    }, "Delete submission: " + file).start();
-                }
-            }
-        };
-        executor.submit(deleter);
-    }
-}
+package org.apache.cassandra.io;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class DeletionService
+{
+    public static final int MAX_RETRIES = 10;
+
+    public static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
+
+    public static void submitDelete(final String file)
+    {
+        Runnable deleter = new WrappedRunnable()
+        {
+            @Override
+            protected void runMayThrow() throws IOException
+            {
+                FileUtils.deleteWithConfirm(new File(file));
+            }
+        };
+        executor.submit(deleter);
+    }
+
+    public static void submitDeleteWithRetry(String file)
+    {
+        submitDeleteWithRetry(file, 0);
+    }
+
+    private static void submitDeleteWithRetry(final String file, final int retryCount)
+    {
+        Runnable deleter = new WrappedRunnable()
+        {
+            @Override
+            protected void runMayThrow() throws IOException
+            {
+                if (!new File(file).delete())
+                {
+                    if (retryCount > MAX_RETRIES)
+                        throw new IOException("Unable to delete " + file + " after " + MAX_RETRIES + " tries");
+                    new Thread(new Runnable()
+                    {
+                        public void run()
+                        {
+                            try
+                            {
+                                Thread.sleep(10000);
+                            }
+                            catch (InterruptedException e)
+                            {
+                                throw new AssertionError(e);
+                            }
+                            submitDeleteWithRetry(file, retryCount + 1);
+                        }
+                    }, "Delete submission: " + file).start();
+                }
+            }
+        };
+        executor.submit(deleter);
+    }
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableDeletingReference.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableDeletingReference.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableDeletingReference.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableDeletingReference.java Tue Feb 23 18:13:07 2010
@@ -1,86 +1,107 @@
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
-import java.lang.ref.PhantomReference;
-import java.lang.ref.ReferenceQueue;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.io.util.FileUtils;
-
-public class SSTableDeletingReference extends PhantomReference<SSTableReader>
-{
-    private static final Logger logger = Logger.getLogger(SSTableDeletingReference.class);
-
-    private static final Timer timer = new Timer("SSTABLE-CLEANUP-TIMER");
-    public static final int RETRY_DELAY = 10000;
-
-    private final SSTableTracker tracker;
-    public final String path;
-    private final long size;
-    private boolean deleteOnCleanup;
-
-    SSTableDeletingReference(SSTableTracker tracker, SSTableReader referent, ReferenceQueue<? super SSTableReader> q)
-    {
-        super(referent, q);
-        this.tracker = tracker;
-        this.path = referent.path;
-        this.size = referent.bytesOnDisk();
-    }
-
-    public void deleteOnCleanup()
-    {
-        deleteOnCleanup = true;
-    }
-
-    public void cleanup() throws IOException
-    {
-        if (deleteOnCleanup)
-        {
-            // this is tricky because the mmapping might not have been finalized yet,
-            // and delete will fail until it is.  additionally, we need to make sure to
-            // delete the data file first, so on restart the others will be recognized as GCable
-            // even if the compaction marker gets deleted next.
-            timer.schedule(new CleanupTask(), RETRY_DELAY);
-        }
-    }
-
-    private class CleanupTask extends TimerTask
-    {
-        int attempts = 0;
-
-        @Override
-        public void run()
-        {
-            File datafile = new File(path);
-            if (!datafile.delete())
-            {
-                if (attempts++ < DeletionService.MAX_RETRIES)
-                {
-                    timer.schedule(this, RETRY_DELAY);
-                    return;
-                }
-                else
-                {
-                    throw new RuntimeException("Unable to delete " + path);
-                }
-            }
-            try
-            {
-                FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(path)));
-                FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(path)));
-                FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(path)));
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-            tracker.spaceReclaimed(size);
-            logger.info("Deleted " + path);
-        }
-    }
-}
+package org.apache.cassandra.io;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.io.util.FileUtils;
+
+public class SSTableDeletingReference extends PhantomReference<SSTableReader>
+{
+    private static final Logger logger = Logger.getLogger(SSTableDeletingReference.class);
+
+    private static final Timer timer = new Timer("SSTABLE-CLEANUP-TIMER");
+    public static final int RETRY_DELAY = 10000;
+
+    private final SSTableTracker tracker;
+    public final String path;
+    private final long size;
+    private boolean deleteOnCleanup;
+
+    SSTableDeletingReference(SSTableTracker tracker, SSTableReader referent, ReferenceQueue<? super SSTableReader> q)
+    {
+        super(referent, q);
+        this.tracker = tracker;
+        this.path = referent.path;
+        this.size = referent.bytesOnDisk();
+    }
+
+    public void deleteOnCleanup()
+    {
+        deleteOnCleanup = true;
+    }
+
+    public void cleanup() throws IOException
+    {
+        if (deleteOnCleanup)
+        {
+            // this is tricky because the mmapping might not have been finalized yet,
+            // and delete will fail until it is.  additionally, we need to make sure to
+            // delete the data file first, so on restart the others will be recognized as GCable
+            // even if the compaction marker gets deleted next.
+            timer.schedule(new CleanupTask(), RETRY_DELAY);
+        }
+    }
+
+    private class CleanupTask extends TimerTask
+    {
+        int attempts = 0;
+
+        @Override
+        public void run()
+        {
+            File datafile = new File(path);
+            if (!datafile.delete())
+            {
+                if (attempts++ < DeletionService.MAX_RETRIES)
+                {
+                    timer.schedule(this, RETRY_DELAY);
+                    return;
+                }
+                else
+                {
+                    throw new RuntimeException("Unable to delete " + path);
+                }
+            }
+            try
+            {
+                FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(path)));
+                FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(path)));
+                FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(path)));
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            tracker.spaceReclaimed(size);
+            logger.info("Deleted " + path);
+        }
+    }
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java Tue Feb 23 18:13:07 2010
@@ -1,18 +1,39 @@
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.Closeable;
-
-public interface FileDataInput extends DataInput, Closeable
-{
-    public String getPath();
-
-    public boolean isEOF() throws IOException;
-
-    public void mark();
-
-    public void reset() throws IOException;
-
-    public int bytesPastMark();
-}
+package org.apache.cassandra.io.util;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.Closeable;
+
+public interface FileDataInput extends DataInput, Closeable
+{
+    public String getPath();
+
+    public boolean isEOF() throws IOException;
+
+    public void mark();
+
+    public void reset() throws IOException;
+
+    public int bytesPastMark();
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Tue Feb 23 18:13:07 2010
@@ -1,404 +1,425 @@
-package org.apache.cassandra.io.util;
-
-import java.nio.MappedByteBuffer;
-import java.io.*;
-
-public class MappedFileDataInput extends InputStream implements FileDataInput
-{
-    private final MappedByteBuffer buffer;
-    private final String filename;
-    private int position;
-    private int markedPosition;
-
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename)
-    {
-        this(buffer, filename, 0);
-    }
-
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position)
-    {
-        assert buffer != null;
-        this.buffer = buffer;
-        this.filename = filename;
-        this.position = position;
-    }
-
-    // don't make this public, this is only for seeking WITHIN the current mapped segment
-    private void seekInternal(int pos) throws IOException
-    {
-        position = pos;
-    }
-
-    @Override
-    public boolean markSupported()
-    {
-        return true;
-    }
-
-    @Override
-    public void mark(int ignored)
-    {
-        markedPosition = position;
-    }
-
-    @Override
-    public void reset() throws IOException
-    {
-        seekInternal(markedPosition);
-    }
-
-    public void mark()
-    {
-        mark(-1);
-    }
-
-    public int bytesPastMark()
-    {
-        assert position >= markedPosition;
-        return position - markedPosition;
-    }
-
-    public boolean isEOF() throws IOException
-    {
-        return position == buffer.capacity();
-    }
-
-    public String getPath()
-    {
-        return filename;
-    }
-
-    public int read() throws IOException
-    {
-        if (isEOF())
-            return -1;
-        return buffer.get(position++) & 0xFF;
-    }
-
-    public int skipBytes(int n) throws IOException
-    {
-        if (n <= 0)
-            return 0;
-        int oldPosition = position;
-        assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
-        position = Math.min(buffer.capacity(), position + n);
-        return position - oldPosition;
-    }
-
-    /*
-     !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
-     */
-
-    /**
-     * Reads a boolean from the current position in this file. Blocks until one
-     * byte has been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next boolean value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final boolean readBoolean() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return temp != 0;
-    }
-
-    /**
-     * Reads an 8-bit byte from the current position in this file. Blocks until
-     * one byte has been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next signed 8-bit byte value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final byte readByte() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return (byte) temp;
-    }
-
-    /**
-     * Reads a 16-bit character from the current position in this file. Blocks until
-     * two bytes have been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next char value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final char readChar() throws IOException {
-        byte[] buffer = new byte[2];
-        if (read(buffer, 0, buffer.length) != buffer.length) {
-            throw new EOFException();
-        }
-        return (char) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
-    }
-
-    /**
-     * Reads a 64-bit double from the current position in this file. Blocks
-     * until eight bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next double value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final double readDouble() throws IOException {
-        return Double.longBitsToDouble(readLong());
-    }
-
-    /**
-     * Reads a 32-bit float from the current position in this file. Blocks
-     * until four bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next float value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final float readFloat() throws IOException {
-        return Float.intBitsToFloat(readInt());
-    }
-
-    /**
-     * Reads bytes from this file into {@code buffer}. Blocks until {@code
-     * buffer.length} number of bytes have been read, the end of the file is
-     * reached or an exception is thrown.
-     *
-     * @param buffer
-     *            the buffer to read bytes into.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws NullPointerException
-     *             if {@code buffer} is {@code null}.
-     */
-    public final void readFully(byte[] buffer) throws IOException {
-        readFully(buffer, 0, buffer.length);
-    }
-
-    /**
-     * Read bytes from this file into {@code buffer} starting at offset {@code
-     * offset}. This method blocks until {@code count} number of bytes have been
-     * read.
-     *
-     * @param buffer
-     *            the buffer to read bytes into.
-     * @param offset
-     *            the initial position in {@code buffer} to store the bytes read
-     *            from this file.
-     * @param count
-     *            the maximum number of bytes to store in {@code buffer}.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IndexOutOfBoundsException
-     *             if {@code offset < 0} or {@code count < 0}, or if {@code
-     *             offset + count} is greater than the length of {@code buffer}.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws NullPointerException
-     *             if {@code buffer} is {@code null}.
-     */
-    public final void readFully(byte[] buffer, int offset, int count)
-            throws IOException {
-        if (buffer == null) {
-            throw new NullPointerException();
-        }
-        // avoid int overflow
-        if (offset < 0 || offset > buffer.length || count < 0
-                || count > buffer.length - offset) {
-            throw new IndexOutOfBoundsException();
-        }
-        while (count > 0) {
-            int result = read(buffer, offset, count);
-            if (result < 0) {
-                throw new EOFException();
-            }
-            offset += result;
-            count -= result;
-        }
-    }
-
-    /**
-     * Reads a 32-bit integer from the current position in this file. Blocks
-     * until four bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next int value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final int readInt() throws IOException {
-        byte[] buffer = new byte[4];
-        if (read(buffer, 0, buffer.length) != buffer.length) {
-            throw new EOFException();
-        }
-        return ((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
-                + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff);
-    }
-
-    /**
-     * Reads a line of text form the current position in this file. A line is
-     * represented by zero or more characters followed by {@code '\n'}, {@code
-     * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
-     * include the line terminating sequence.
-     * <p>
-     * Blocks until a line terminating sequence has been read, the end of the
-     * file is reached or an exception is thrown.
-     *
-     * @return the contents of the line or {@code null} if no characters have
-     *         been read before the end of the file has been reached.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final String readLine() throws IOException {
-        StringBuilder line = new StringBuilder(80); // Typical line length
-        boolean foundTerminator = false;
-        int unreadPosition = 0;
-        while (true) {
-            int nextByte = read();
-            switch (nextByte) {
-                case -1:
-                    return line.length() != 0 ? line.toString() : null;
-                case (byte) '\r':
-                    if (foundTerminator) {
-                        seekInternal(unreadPosition);
-                        return line.toString();
-                    }
-                    foundTerminator = true;
-                    /* Have to be able to peek ahead one byte */
-                    unreadPosition = position;
-                    break;
-                case (byte) '\n':
-                    return line.toString();
-                default:
-                    if (foundTerminator) {
-                        seekInternal(unreadPosition);
-                        return line.toString();
-                    }
-                    line.append((char) nextByte);
-            }
-        }
-    }
-
-    /**
-     * Reads a 64-bit long from the current position in this file. Blocks until
-     * eight bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next long value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final long readLong() throws IOException {
-        byte[] buffer = new byte[8];
-        int n = read(buffer, 0, buffer.length);
-        if (n != buffer.length) {
-            throw new EOFException("expected 8 bytes; read " + n + " at final position " + position);
-        }
-        return ((long) (((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
-                + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff)) << 32)
-                + ((long) (buffer[4] & 0xff) << 24)
-                + ((buffer[5] & 0xff) << 16)
-                + ((buffer[6] & 0xff) << 8)
-                + (buffer[7] & 0xff);
-    }
-
-    /**
-     * Reads a 16-bit short from the current position in this file. Blocks until
-     * two bytes have been read, the end of the file is reached or an exception
-     * is thrown.
-     *
-     * @return the next short value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final short readShort() throws IOException {
-        byte[] buffer = new byte[2];
-        if (read(buffer, 0, buffer.length) != buffer.length) {
-            throw new EOFException();
-        }
-        return (short) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
-    }
-
-    /**
-     * Reads an unsigned 8-bit byte from the current position in this file and
-     * returns it as an integer. Blocks until one byte has been read, the end of
-     * the file is reached or an exception is thrown.
-     *
-     * @return the next unsigned byte value from this file as an int.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final int readUnsignedByte() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return temp;
-    }
-
-    /**
-     * Reads an unsigned 16-bit short from the current position in this file and
-     * returns it as an integer. Blocks until two bytes have been read, the end of
-     * the file is reached or an exception is thrown.
-     *
-     * @return the next unsigned short value from this file as an int.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final int readUnsignedShort() throws IOException {
-        byte[] buffer = new byte[2];
-        if (read(buffer, 0, buffer.length) != buffer.length) {
-            throw new EOFException();
-        }
-        return ((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff);
-    }
-
-    /**
-     * Reads a string that is encoded in {@link DataInput modified UTF-8} from
-     * this file. The number of bytes that must be read for the complete string
-     * is determined by the first two bytes read from the file. Blocks until all
-     * required bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next string encoded in {@link DataInput modified UTF-8} from
-     *         this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws UTFDataFormatException
-     *             if the bytes read cannot be decoded into a character string.
-     */
-    public final String readUTF() throws IOException {
-        return DataInputStream.readUTF(this);
-    }
-}
+package org.apache.cassandra.io.util;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.nio.MappedByteBuffer;
+import java.io.*;
+
+public class MappedFileDataInput extends InputStream implements FileDataInput
+{
+    private final MappedByteBuffer buffer;
+    private final String filename;
+    private int position;
+    private int markedPosition;
+
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename)
+    {
+        this(buffer, filename, 0);
+    }
+
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position)
+    {
+        assert buffer != null;
+        this.buffer = buffer;
+        this.filename = filename;
+        this.position = position;
+    }
+
+    // don't make this public, this is only for seeking WITHIN the current mapped segment
+    private void seekInternal(int pos) throws IOException
+    {
+        position = pos;
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return true;
+    }
+
+    @Override
+    public void mark(int ignored)
+    {
+        markedPosition = position;
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        seekInternal(markedPosition);
+    }
+
+    public void mark()
+    {
+        mark(-1);
+    }
+
+    public int bytesPastMark()
+    {
+        assert position >= markedPosition;
+        return position - markedPosition;
+    }
+
+    public boolean isEOF() throws IOException
+    {
+        return position == buffer.capacity();
+    }
+
+    public String getPath()
+    {
+        return filename;
+    }
+
+    public int read() throws IOException
+    {
+        if (isEOF())
+            return -1;
+        return buffer.get(position++) & 0xFF;
+    }
+
+    public int skipBytes(int n) throws IOException
+    {
+        if (n <= 0)
+            return 0;
+        int oldPosition = position;
+        assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
+        position = Math.min(buffer.capacity(), position + n);
+        return position - oldPosition;
+    }
+
+    /*
+     !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+     */
+
+    /**
+     * Reads a boolean from the current position in this file. Blocks until one
+     * byte has been read, the end of the file is reached or an exception is
+     * thrown.
+     *
+     * @return the next boolean value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final boolean readBoolean() throws IOException {
+        int temp = this.read();
+        if (temp < 0) {
+            throw new EOFException();
+        }
+        return temp != 0;
+    }
+
+    /**
+     * Reads an 8-bit byte from the current position in this file. Blocks until
+     * one byte has been read, the end of the file is reached or an exception is
+     * thrown.
+     *
+     * @return the next signed 8-bit byte value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final byte readByte() throws IOException {
+        int temp = this.read();
+        if (temp < 0) {
+            throw new EOFException();
+        }
+        return (byte) temp;
+    }
+
+    /**
+     * Reads a 16-bit character from the current position in this file. Blocks until
+     * two bytes have been read, the end of the file is reached or an exception is
+     * thrown.
+     *
+     * @return the next char value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final char readChar() throws IOException {
+        byte[] buffer = new byte[2];
+        if (read(buffer, 0, buffer.length) != buffer.length) {
+            throw new EOFException();
+        }
+        return (char) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
+    }
+
+    /**
+     * Reads a 64-bit double from the current position in this file. Blocks
+     * until eight bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next double value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final double readDouble() throws IOException {
+        return Double.longBitsToDouble(readLong());
+    }
+
+    /**
+     * Reads a 32-bit float from the current position in this file. Blocks
+     * until four bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next float value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final float readFloat() throws IOException {
+        return Float.intBitsToFloat(readInt());
+    }
+
+    /**
+     * Reads bytes from this file into {@code buffer}. Blocks until {@code
+     * buffer.length} number of bytes have been read, the end of the file is
+     * reached or an exception is thrown.
+     *
+     * @param buffer
+     *            the buffer to read bytes into.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     * @throws NullPointerException
+     *             if {@code buffer} is {@code null}.
+     */
+    public final void readFully(byte[] buffer) throws IOException {
+        readFully(buffer, 0, buffer.length);
+    }
+
+    /**
+     * Read bytes from this file into {@code buffer} starting at offset {@code
+     * offset}. This method blocks until {@code count} number of bytes have been
+     * read.
+     *
+     * @param buffer
+     *            the buffer to read bytes into.
+     * @param offset
+     *            the initial position in {@code buffer} to store the bytes read
+     *            from this file.
+     * @param count
+     *            the maximum number of bytes to store in {@code buffer}.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IndexOutOfBoundsException
+     *             if {@code offset < 0} or {@code count < 0}, or if {@code
+     *             offset + count} is greater than the length of {@code buffer}.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     * @throws NullPointerException
+     *             if {@code buffer} is {@code null}.
+     */
+    public final void readFully(byte[] buffer, int offset, int count)
+            throws IOException {
+        if (buffer == null) {
+            throw new NullPointerException();
+        }
+        // avoid int overflow
+        if (offset < 0 || offset > buffer.length || count < 0
+                || count > buffer.length - offset) {
+            throw new IndexOutOfBoundsException();
+        }
+        while (count > 0) {
+            int result = read(buffer, offset, count);
+            if (result < 0) {
+                throw new EOFException();
+            }
+            offset += result;
+            count -= result;
+        }
+    }
+
+    /**
+     * Reads a 32-bit integer from the current position in this file. Blocks
+     * until four bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next int value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final int readInt() throws IOException {
+        byte[] buffer = new byte[4];
+        if (read(buffer, 0, buffer.length) != buffer.length) {
+            throw new EOFException();
+        }
+        return ((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
+                + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff);
+    }
+
+    /**
+     * Reads a line of text form the current position in this file. A line is
+     * represented by zero or more characters followed by {@code '\n'}, {@code
+     * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
+     * include the line terminating sequence.
+     * <p>
+     * Blocks until a line terminating sequence has been read, the end of the
+     * file is reached or an exception is thrown.
+     *
+     * @return the contents of the line or {@code null} if no characters have
+     *         been read before the end of the file has been reached.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final String readLine() throws IOException {
+        StringBuilder line = new StringBuilder(80); // Typical line length
+        boolean foundTerminator = false;
+        int unreadPosition = 0;
+        while (true) {
+            int nextByte = read();
+            switch (nextByte) {
+                case -1:
+                    return line.length() != 0 ? line.toString() : null;
+                case (byte) '\r':
+                    if (foundTerminator) {
+                        seekInternal(unreadPosition);
+                        return line.toString();
+                    }
+                    foundTerminator = true;
+                    /* Have to be able to peek ahead one byte */
+                    unreadPosition = position;
+                    break;
+                case (byte) '\n':
+                    return line.toString();
+                default:
+                    if (foundTerminator) {
+                        seekInternal(unreadPosition);
+                        return line.toString();
+                    }
+                    line.append((char) nextByte);
+            }
+        }
+    }
+
+    /**
+     * Reads a 64-bit long from the current position in this file. Blocks until
+     * eight bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next long value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final long readLong() throws IOException {
+        byte[] buffer = new byte[8];
+        int n = read(buffer, 0, buffer.length);
+        if (n != buffer.length) {
+            throw new EOFException("expected 8 bytes; read " + n + " at final position " + position);
+        }
+        return ((long) (((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
+                + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff)) << 32)
+                + ((long) (buffer[4] & 0xff) << 24)
+                + ((buffer[5] & 0xff) << 16)
+                + ((buffer[6] & 0xff) << 8)
+                + (buffer[7] & 0xff);
+    }
+
+    /**
+     * Reads a 16-bit short from the current position in this file. Blocks until
+     * two bytes have been read, the end of the file is reached or an exception
+     * is thrown.
+     *
+     * @return the next short value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final short readShort() throws IOException {
+        byte[] buffer = new byte[2];
+        if (read(buffer, 0, buffer.length) != buffer.length) {
+            throw new EOFException();
+        }
+        return (short) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
+    }
+
+    /**
+     * Reads an unsigned 8-bit byte from the current position in this file and
+     * returns it as an integer. Blocks until one byte has been read, the end of
+     * the file is reached or an exception is thrown.
+     *
+     * @return the next unsigned byte value from this file as an int.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final int readUnsignedByte() throws IOException {
+        int temp = this.read();
+        if (temp < 0) {
+            throw new EOFException();
+        }
+        return temp;
+    }
+
+    /**
+     * Reads an unsigned 16-bit short from the current position in this file and
+     * returns it as an integer. Blocks until two bytes have been read, the end of
+     * the file is reached or an exception is thrown.
+     *
+     * @return the next unsigned short value from this file as an int.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final int readUnsignedShort() throws IOException {
+        byte[] buffer = new byte[2];
+        if (read(buffer, 0, buffer.length) != buffer.length) {
+            throw new EOFException();
+        }
+        return ((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff);
+    }
+
+    /**
+     * Reads a string that is encoded in {@link DataInput modified UTF-8} from
+     * this file. The number of bytes that must be read for the complete string
+     * is determined by the first two bytes read from the file. Blocks until all
+     * required bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next string encoded in {@link DataInput modified UTF-8} from
+     *         this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     * @throws UTFDataFormatException
+     *             if the bytes read cannot be decoded into a character string.
+     */
+    public final String readUTF() throws IOException {
+        return DataInputStream.readUTF(this);
+    }
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Feb 23 18:13:07 2010
@@ -1,69 +1,90 @@
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.net.Socket;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.streaming.IncomingStreamReader;
-
-public class IncomingTcpConnection extends Thread
-{
-    private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
-
-    private final DataInputStream input;
-    private Socket socket;
-
-    public IncomingTcpConnection(Socket socket)
-    {
-        this.socket = socket;
-        try
-        {
-            input = new DataInputStream(socket.getInputStream());
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-    }
-
-    @Override
-    public void run()
-    {
-        while (true)
-        {
-            try
-            {
-                MessagingService.validateMagic(input.readInt());
-                int header = input.readInt();
-                int type = MessagingService.getBits(header, 1, 2);
-                boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
-                int version = MessagingService.getBits(header, 15, 8);
-
-                if (isStream)
-                {
-                    new IncomingStreamReader(socket.getChannel()).read();
-                }
-                else
-                {
-                    int size = input.readInt();
-                    byte[] contentBytes = new byte[size];
-                    input.readFully(contentBytes);
-                    MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
-                }
-            }
-            catch (EOFException e)
-            {
-                if (logger.isTraceEnabled())
-                    logger.trace("eof reading from socket; closing", e);
-                break;
-            }
-            catch (IOException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("error reading from socket; closing", e);
-                break;
-            }
-        }
-    }
-}
+package org.apache.cassandra.net;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.*;
+import java.net.Socket;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.streaming.IncomingStreamReader;
+
+public class IncomingTcpConnection extends Thread
+{
+    private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
+
+    private final DataInputStream input;
+    private Socket socket;
+
+    public IncomingTcpConnection(Socket socket)
+    {
+        this.socket = socket;
+        try
+        {
+            input = new DataInputStream(socket.getInputStream());
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    @Override
+    public void run()
+    {
+        while (true)
+        {
+            try
+            {
+                MessagingService.validateMagic(input.readInt());
+                int header = input.readInt();
+                int type = MessagingService.getBits(header, 1, 2);
+                boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+                int version = MessagingService.getBits(header, 15, 8);
+
+                if (isStream)
+                {
+                    new IncomingStreamReader(socket.getChannel()).read();
+                }
+                else
+                {
+                    int size = input.readInt();
+                    byte[] contentBytes = new byte[size];
+                    input.readFully(contentBytes);
+                    MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+                }
+            }
+            catch (EOFException e)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("eof reading from socket; closing", e);
+                break;
+            }
+            catch (IOException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("error reading from socket; closing", e);
+                break;
+            }
+        }
+    }
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Tue Feb 23 18:13:07 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.net;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.DataOutputStream;
 import java.io.IOException;

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java Tue Feb 23 18:13:07 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.service;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.File;
 import java.io.FileOutputStream;

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java Tue Feb 23 18:13:07 2010
@@ -1,100 +1,121 @@
-package org.apache.cassandra.streaming;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-class CompletedFileStatus
-{
-    private static ICompactSerializer<CompletedFileStatus> serializer_;
-
-    public static enum StreamCompletionAction
-    {
-        DELETE,
-        STREAM
-    }
-
-    static
-    {
-        serializer_ = new CompletedFileStatusSerializer();
-    }
-
-    public static ICompactSerializer<CompletedFileStatus> serializer()
-    {
-        return serializer_;
-    }
-
-    private String file_;
-    private long expectedBytes_;
-    private StreamCompletionAction action_;
-
-    public CompletedFileStatus(String file, long expectedBytes)
-    {
-        file_ = file;
-        expectedBytes_ = expectedBytes;
-        action_ = StreamCompletionAction.DELETE;
-    }
-
-    public String getFile()
-    {
-        return file_;
-    }
-
-    public long getExpectedBytes()
-    {
-        return expectedBytes_;
-    }
-
-    public void setAction(StreamCompletionAction action)
-    {
-        action_ = action;
-    }
-
-    public StreamCompletionAction getAction()
-    {
-        return action_;
-    }
-
-    public Message makeStreamStatusMessage() throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        CompletedFileStatus.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
-    }
-
-    private static class CompletedFileStatusSerializer implements ICompactSerializer<CompletedFileStatus>
-    {
-        public void serialize(CompletedFileStatus streamStatus, DataOutputStream dos) throws IOException
-        {
-            dos.writeUTF(streamStatus.getFile());
-            dos.writeLong(streamStatus.getExpectedBytes());
-            dos.writeInt(streamStatus.getAction().ordinal());
-        }
-
-        public CompletedFileStatus deserialize(DataInputStream dis) throws IOException
-        {
-            String targetFile = dis.readUTF();
-            long expectedBytes = dis.readLong();
-            CompletedFileStatus streamStatus = new CompletedFileStatus(targetFile, expectedBytes);
-
-            int ordinal = dis.readInt();
-            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.DELETE);
-            }
-            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.STREAM);
-            }
-
-            return streamStatus;
-        }
-    }
-}
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+class CompletedFileStatus
+{
+    private static ICompactSerializer<CompletedFileStatus> serializer_;
+
+    public static enum StreamCompletionAction
+    {
+        DELETE,
+        STREAM
+    }
+
+    static
+    {
+        serializer_ = new CompletedFileStatusSerializer();
+    }
+
+    public static ICompactSerializer<CompletedFileStatus> serializer()
+    {
+        return serializer_;
+    }
+
+    private String file_;
+    private long expectedBytes_;
+    private StreamCompletionAction action_;
+
+    public CompletedFileStatus(String file, long expectedBytes)
+    {
+        file_ = file;
+        expectedBytes_ = expectedBytes;
+        action_ = StreamCompletionAction.DELETE;
+    }
+
+    public String getFile()
+    {
+        return file_;
+    }
+
+    public long getExpectedBytes()
+    {
+        return expectedBytes_;
+    }
+
+    public void setAction(StreamCompletionAction action)
+    {
+        action_ = action;
+    }
+
+    public StreamCompletionAction getAction()
+    {
+        return action_;
+    }
+
+    public Message makeStreamStatusMessage() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        CompletedFileStatus.serializer().serialize(this, dos);
+        return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
+    }
+
+    private static class CompletedFileStatusSerializer implements ICompactSerializer<CompletedFileStatus>
+    {
+        public void serialize(CompletedFileStatus streamStatus, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(streamStatus.getFile());
+            dos.writeLong(streamStatus.getExpectedBytes());
+            dos.writeInt(streamStatus.getAction().ordinal());
+        }
+
+        public CompletedFileStatus deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();
+            CompletedFileStatus streamStatus = new CompletedFileStatus(targetFile, expectedBytes);
+
+            int ordinal = dis.readInt();
+            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.DELETE);
+            }
+            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.STREAM);
+            }
+
+            return streamStatus;
+        }
+    }
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/PendingFile.java Tue Feb 23 18:13:07 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Tue Feb 23 18:13:07 2010
@@ -1,62 +1,83 @@
-package org.apache.cassandra.streaming;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.SSTableWriter;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.StreamInManager;
-import org.apache.cassandra.service.StorageService;
-
-/**
- * This is the callback handler that is invoked when we have
- * completely received a single file from a remote host.
- *
- * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
-*/
-class StreamCompletionHandler implements IStreamComplete
-{
-    private static Logger logger = Logger.getLogger(StreamCompletionHandler.class);
-
-    public void onStreamCompletion(InetAddress host, PendingFile pendingFile, CompletedFileStatus streamStatus) throws IOException
-    {
-        /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
-        if (pendingFile.getTargetFile().contains("-Data.db"))
-        {
-            String tableName = pendingFile.getTable();
-            File file = new File( pendingFile.getTargetFile() );
-            String fileName = file.getName();
-            String [] temp = fileName.split("-");
-
-            //Open the file to see if all parts are now here
-            try
-            {
-                SSTableReader sstable = SSTableWriter.renameAndOpen(pendingFile.getTargetFile());
-                //TODO add a sanity check that this sstable has all its parts and is ok
-                Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
-                logger.info("Streaming added " + sstable.getFilename());
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException("Not able to add streamed file " + pendingFile.getTargetFile(), e);
-            }
-        }
-
-        if (logger.isDebugEnabled())
-          logger.debug("Sending a streaming finished message with " + streamStatus + " to " + host);
-        /* Send a StreamStatus message which may require the source node to re-stream certain files. */
-        MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), host);
-
-        /* If we're done with everything for this host, remove from bootstrap sources */
-        if (StreamInManager.isDone(host) && StorageService.instance.isBootstrapMode())
-        {
-            StorageService.instance.removeBootstrapSource(host, pendingFile.getTable());
-        }
-    }
-}
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.IStreamComplete;
+import org.apache.cassandra.streaming.StreamInManager;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * This is the callback handler that is invoked when we have
+ * completely received a single file from a remote host.
+ *
+ * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
+*/
+class StreamCompletionHandler implements IStreamComplete
+{
+    private static Logger logger = Logger.getLogger(StreamCompletionHandler.class);
+
+    public void onStreamCompletion(InetAddress host, PendingFile pendingFile, CompletedFileStatus streamStatus) throws IOException
+    {
+        /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
+        if (pendingFile.getTargetFile().contains("-Data.db"))
+        {
+            String tableName = pendingFile.getTable();
+            File file = new File( pendingFile.getTargetFile() );
+            String fileName = file.getName();
+            String [] temp = fileName.split("-");
+
+            //Open the file to see if all parts are now here
+            try
+            {
+                SSTableReader sstable = SSTableWriter.renameAndOpen(pendingFile.getTargetFile());
+                //TODO add a sanity check that this sstable has all its parts and is ok
+                Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
+                logger.info("Streaming added " + sstable.getFilename());
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Not able to add streamed file " + pendingFile.getTargetFile(), e);
+            }
+        }
+
+        if (logger.isDebugEnabled())
+          logger.debug("Sending a streaming finished message with " + streamStatus + " to " + host);
+        /* Send a StreamStatus message which may require the source node to re-stream certain files. */
+        MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), host);
+
+        /* If we're done with everything for this host, remove from bootstrap sources */
+        if (StreamInManager.isDone(host) && StorageService.instance.isBootstrapMode())
+        {
+            StorageService.instance.removeBootstrapSource(host, pendingFile.getTable());
+        }
+    }
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Tue Feb 23 18:13:07 2010
@@ -1,48 +1,69 @@
-package org.apache.cassandra.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOError;
-import java.io.IOException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamOutManager;
-
-public class StreamFinishedVerbHandler implements IVerbHandler
-{
-    private static Logger logger = Logger.getLogger(StreamFinishedVerbHandler.class);
-
-    public void doVerb(Message message)
-    {
-        byte[] body = message.getMessageBody();
-        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-
-        try
-        {
-            CompletedFileStatus streamStatus = CompletedFileStatus.serializer().deserialize(new DataInputStream(bufIn));
-
-            switch (streamStatus.getAction())
-            {
-                case DELETE:
-                    StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
-                    break;
-
-                case STREAM:
-                    if (logger.isDebugEnabled())
-                        logger.debug("Need to re-stream file " + streamStatus.getFile());
-                    StreamOutManager.get(message.getFrom()).startNext();
-                    break;
-
-                default:
-                    break;
-            }
-        }
-        catch (IOException ex)
-        {
-            throw new IOError(ex);
-        }
-    }
-}
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamOutManager;
+
+public class StreamFinishedVerbHandler implements IVerbHandler
+{
+    private static Logger logger = Logger.getLogger(StreamFinishedVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+
+        try
+        {
+            CompletedFileStatus streamStatus = CompletedFileStatus.serializer().deserialize(new DataInputStream(bufIn));
+
+            switch (streamStatus.getAction())
+            {
+                case DELETE:
+                    StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
+                    break;
+
+                case STREAM:
+                    if (logger.isDebugEnabled())
+                        logger.debug("Need to re-stream file " + streamStatus.getFile());
+                    StreamOutManager.get(message.getFrom()).startNext();
+                    break;
+
+                default:
+                    break;
+            }
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+    }
+}

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java Tue Feb 23 18:13:07 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.net.InetAddress;
 import java.util.Collection;

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=915463&r1=915462&r2=915463&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java Tue Feb 23 18:13:07 2010
@@ -1,19 +1,40 @@
-package org.apache.cassandra.streaming;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamOutManager;
-
-public class StreamInitiateDoneVerbHandler implements IVerbHandler
-{
-    private static Logger logger = Logger.getLogger(StreamInitiateDoneVerbHandler.class);
-
-    public void doVerb(Message message)
-    {
-        if (logger.isDebugEnabled())
-          logger.debug("Received a stream initiate done message ...");
-        StreamOutManager.get(message.getFrom()).startNext();
-    }
-}
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamOutManager;
+
+public class StreamInitiateDoneVerbHandler implements IVerbHandler
+{
+    private static Logger logger = Logger.getLogger(StreamInitiateDoneVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        if (logger.isDebugEnabled())
+          logger.debug("Received a stream initiate done message ...");
+        StreamOutManager.get(message.getFrom()).startNext();
+    }
+}



Mime
View raw message