incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Benoit Perroud <ben...@noisette.ch>
Subject SSTableWriter and Bulk Loading life cycle enhancement
Date Thu, 03 May 2012 11:40:35 GMT
Hi All,

I'm bulk loading (a lot of) data from Hadoop into Cassandra 1.0.x. The
provided CFOutputFormat is not the best case here, I wanted to use the
bulk loading feature. I know 1.1 comes with a BulkOutputFormat but I
wanted to propose a simple enhancement to SSTableSimpleUnsortedWriter
that could ease life :

When the table is flushed into the disk, it could be interesting to
have listeners that could be triggered to perform any action (copying
my SSTable into HDFS for instance).

Please have a look at the patch below to give a better idea. Do you
think it could worth while opening a jira for this ?


Regarding 1.1 BulkOutputFormat and bulk in general, the work done to
have light client to stream into the cluster is really great. The
issue now is that data is streamed at the end of the task only. This
cause all the tasks storing the data locally and streaming everything
at the end. Lot's of temporary space may be needed, and lot of
bandwidth to the nodes are used at the "same" time. With the listener,
we would be able to start streaming as soon the first table is
created. That way the streaming bandwidth could be better balanced.
Jira for this also ?

Thanks

Benoit.




--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.io.sstable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;

@@ -47,6 +49,8 @@ public class SSTableSimpleUnsortedWriter extends
AbstractSSTableSimpleWriter
     private final long bufferSize;
     private long currentSize;

+    private final List<SSTableWriterListener> sSTableWrittenListeners
= new LinkedList<SSTableWriterListener>();
+
     /**
      * Create a new buffering writer.
      * @param directory the directory where to write the sstables
@@ -123,5 +127,16 @@ public class SSTableSimpleUnsortedWriter extends
AbstractSSTableSimpleWriter
         }
         currentSize = 0;
         keys.clear();
+
+        // Notify the registered listeners
+        for (SSTableWriterListener listeners : sSTableWrittenListeners)
+        {
+
listeners.onSSTableWrittenAndClosed(writer.getTableName(),
writer.getColumnFamilyName(), writer.getFilename());
+        }
+    }
+
+    public void addSSTableWriterListener(SSTableWriterListener listener)
+    {
+       sSTableWrittenListeners.add(listener);
     }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
b/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
new file mode 100644
index 0000000..6628d20
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
@@ -0,0 +1,9 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+
+public interface SSTableWriterListener {
+
+       void onSSTableWrittenAndClosed(final String tableName, final
String columnFamilyName, final String filename) throws IOException;
+
+}

Mime
View raw message