Can you copy the sstables as a task after the load operation ? You should know where the files are. 

The are multiple files may be created by the writer during the loading process. So running code that performs a long running action will impact on the time taken to pump data through the SSTableSimpleUnsortedWriter.

wrt the patch, the best place to start the conversation for this is on https://issues.apache.org/jira/browse/CASSANDRA 

Thanks taking the time to look into this. 

Cheers


-----------------
Aaron Morton
Freelance Developer
@aaronmorton

On 3/05/2012, at 11:40 PM, Benoit Perroud wrote:

Hi All,

I'm bulk loading (a lot of) data from Hadoop into Cassandra 1.0.x. The
provided CFOutputFormat is not the best case here, I wanted to use the
bulk loading feature. I know 1.1 comes with a BulkOutputFormat but I
wanted to propose a simple enhancement to SSTableSimpleUnsortedWriter
that could ease life :

When the table is flushed into the disk, it could be interesting to
have listeners that could be triggered to perform any action (copying
my SSTable into HDFS for instance).

Please have a look at the patch below to give a better idea. Do you
think it could worth while opening a jira for this ?


Regarding 1.1 BulkOutputFormat and bulk in general, the work done to
have light client to stream into the cluster is really great. The
issue now is that data is streamed at the end of the task only. This
cause all the tasks storing the data locally and streaming everything
at the end. Lot's of temporary space may be needed, and lot of
bandwidth to the nodes are used at the "same" time. With the listener,
we would be able to start streaming as soon the first table is
created. That way the streaming bandwidth could be better balanced.
Jira for this also ?

Thanks

Benoit.




--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;

@@ -47,6 +49,8 @@ public class SSTableSimpleUnsortedWriter extends
AbstractSSTableSimpleWriter
    private final long bufferSize;
    private long currentSize;

+    private final List<SSTableWriterListener> sSTableWrittenListeners
= new LinkedList<SSTableWriterListener>();
+
    /**
     * Create a new buffering writer.
     * @param directory the directory where to write the sstables
@@ -123,5 +127,16 @@ public class SSTableSimpleUnsortedWriter extends
AbstractSSTableSimpleWriter
        }
        currentSize = 0;
        keys.clear();
+
+        // Notify the registered listeners
+        for (SSTableWriterListener listeners : sSTableWrittenListeners)
+        {
+
listeners.onSSTableWrittenAndClosed(writer.getTableName(),
writer.getColumnFamilyName(), writer.getFilename());
+        }
+    }
+
+    public void addSSTableWriterListener(SSTableWriterListener listener)
+    {
+       sSTableWrittenListeners.add(listener);
    }
}
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
b/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
new file mode 100644
index 0000000..6628d20
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
@@ -0,0 +1,9 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+public interface SSTableWriterListener {
+
+       void onSSTableWrittenAndClosed(final String tableName, final
String columnFamilyName, final String filename) throws IOException;
+
+}