Return-Path: X-Original-To: apmail-cassandra-user-archive@www.apache.org Delivered-To: apmail-cassandra-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2FB8AC863 for ; Mon, 7 May 2012 10:48:18 +0000 (UTC) Received: (qmail 71022 invoked by uid 500); 7 May 2012 10:48:15 -0000 Delivered-To: apmail-cassandra-user-archive@cassandra.apache.org Received: (qmail 71002 invoked by uid 500); 7 May 2012 10:48:15 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 70992 invoked by uid 99); 7 May 2012 10:48:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 May 2012 10:48:15 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [208.113.200.5] (HELO homiemail-a54.g.dreamhost.com) (208.113.200.5) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 May 2012 10:48:09 +0000 Received: from homiemail-a54.g.dreamhost.com (localhost [127.0.0.1]) by homiemail-a54.g.dreamhost.com (Postfix) with ESMTP id 4DCD53A406D for ; Mon, 7 May 2012 03:47:48 -0700 (PDT) DomainKey-Signature: a=rsa-sha1; c=nofws; d=thelastpickle.com; h=from :mime-version:content-type:subject:date:in-reply-to:to :references:message-id; q=dns; s=thelastpickle.com; b=WJ5uz2J/Eo ODK0cLJ6xUdMxI0oWaTDz+IGI1oaN++wu8jPflYAS58JEurI5cPMG6h0OpJNG4BF XKRUEprQun0wfjkZ00Csay+FogfQI3KxSJ0QJHB/n/yHLSkOEQjPVDamAAJVWexJ DakmgluMCXPyvidyiGvF+qOWolwfSTnmM= DKIM-Signature: v=1; a=rsa-sha1; c=relaxed; d=thelastpickle.com; h=from :mime-version:content-type:subject:date:in-reply-to:to :references:message-id; s=thelastpickle.com; bh=CbTAbfCRng9hSYEf v3cXRLwAEGo=; b=0vwsyQ/vgpRa7rLbVl9P+mnCRQWfArKPeJLMnmQeEgo/o44q vUn5sqtGfYOKqqzhsPdEhNSeMqZAyztJcVxV/7b+CrFFgasxtdYcpgp9WQ2s5PR6 mAQ1xaO//UhorGfty9hP3Ry5c4dsnOrU2GkMml6bA4No0nsj6jCaH7ttE+4= Received: from [172.16.1.4] (253.194.69.111.dynamic.snap.net.nz [111.69.194.253]) (using TLSv1 with cipher AES128-SHA (128/128 bits)) (No client certificate requested) (Authenticated sender: aaron@thelastpickle.com) by homiemail-a54.g.dreamhost.com (Postfix) with ESMTPSA id 9DC213A4058 for ; Mon, 7 May 2012 03:47:47 -0700 (PDT) From: aaron morton Mime-Version: 1.0 (Apple Message framework v1257) Content-Type: multipart/alternative; boundary="Apple-Mail=_0FB92A32-723F-4263-A2E6-A730359FFF2A" Subject: Re: SSTableWriter and Bulk Loading life cycle enhancement Date: Mon, 7 May 2012 22:47:44 +1200 In-Reply-To: To: user@cassandra.apache.org References: Message-Id: <8075DBEE-FBF8-4D9F-810F-44582567D593@thelastpickle.com> X-Mailer: Apple Mail (2.1257) X-Virus-Checked: Checked by ClamAV on apache.org --Apple-Mail=_0FB92A32-723F-4263-A2E6-A730359FFF2A Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=us-ascii Can you copy the sstables as a task after the load operation ? You = should know where the files are.=20 The are multiple files may be created by the writer during the loading = process. So running code that performs a long running action will impact = on the time taken to pump data through the SSTableSimpleUnsortedWriter. wrt the patch, the best place to start the conversation for this is on = https://issues.apache.org/jira/browse/CASSANDRA=20 Thanks taking the time to look into this.=20 Cheers ----------------- Aaron Morton Freelance Developer @aaronmorton http://www.thelastpickle.com On 3/05/2012, at 11:40 PM, Benoit Perroud wrote: > Hi All, >=20 > I'm bulk loading (a lot of) data from Hadoop into Cassandra 1.0.x. The > provided CFOutputFormat is not the best case here, I wanted to use the > bulk loading feature. I know 1.1 comes with a BulkOutputFormat but I > wanted to propose a simple enhancement to SSTableSimpleUnsortedWriter > that could ease life : >=20 > When the table is flushed into the disk, it could be interesting to > have listeners that could be triggered to perform any action (copying > my SSTable into HDFS for instance). >=20 > Please have a look at the patch below to give a better idea. Do you > think it could worth while opening a jira for this ? >=20 >=20 > Regarding 1.1 BulkOutputFormat and bulk in general, the work done to > have light client to stream into the cluster is really great. The > issue now is that data is streamed at the end of the task only. This > cause all the tasks storing the data locally and streaming everything > at the end. Lot's of temporary space may be needed, and lot of > bandwidth to the nodes are used at the "same" time. With the listener, > we would be able to start streaming as soon the first table is > created. That way the streaming bandwidth could be better balanced. > Jira for this also ? >=20 > Thanks >=20 > Benoit. >=20 >=20 >=20 >=20 > --- = a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.jav= a > +++ = b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.jav= a > @@ -21,6 +21,8 @@ package org.apache.cassandra.io.sstable; > import java.io.File; > import java.io.IOException; > import java.nio.ByteBuffer; > +import java.util.LinkedList; > +import java.util.List; > import java.util.Map; > import java.util.TreeMap; >=20 > @@ -47,6 +49,8 @@ public class SSTableSimpleUnsortedWriter extends > AbstractSSTableSimpleWriter > private final long bufferSize; > private long currentSize; >=20 > + private final List sSTableWrittenListeners > =3D new LinkedList(); > + > /** > * Create a new buffering writer. > * @param directory the directory where to write the sstables > @@ -123,5 +127,16 @@ public class SSTableSimpleUnsortedWriter extends > AbstractSSTableSimpleWriter > } > currentSize =3D 0; > keys.clear(); > + > + // Notify the registered listeners > + for (SSTableWriterListener listeners : = sSTableWrittenListeners) > + { > + > listeners.onSSTableWrittenAndClosed(writer.getTableName(), > writer.getColumnFamilyName(), writer.getFilename()); > + } > + } > + > + public void addSSTableWriterListener(SSTableWriterListener = listener) > + { > + sSTableWrittenListeners.add(listener); > } > } > diff --git = a/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java > b/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java > new file mode 100644 > index 0000000..6628d20 > --- /dev/null > +++ = b/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java > @@ -0,0 +1,9 @@ > +package org.apache.cassandra.io.sstable; > + > +import java.io.IOException; > + > +public interface SSTableWriterListener { > + > + void onSSTableWrittenAndClosed(final String tableName, final > String columnFamilyName, final String filename) throws IOException; > + > +} --Apple-Mail=_0FB92A32-723F-4263-A2E6-A730359FFF2A Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=us-ascii Can = you copy the sstables as a task after the load operation ? You should = know where the files are. 

The are multiple = files may be created by the writer during the loading process. So = running code that performs a long running action will impact on the time = taken to pump data through = the SSTableSimpleUnsortedWriter.

wrt the patch, = the best place to start the conversation for this is on https://issues.ap= ache.org/jira/browse/CASSANDRA 

Thanks = taking the time to look into = this. 

Cheers
http://www.thelastpickle.com

On 3/05/2012, at 11:40 PM, Benoit Perroud wrote:

Hi = All,

I'm bulk loading (a lot of) data from Hadoop into Cassandra = 1.0.x. The
provided CFOutputFormat is not the best case here, I = wanted to use the
bulk loading feature. I know 1.1 comes with a = BulkOutputFormat but I
wanted to propose a simple enhancement to = SSTableSimpleUnsortedWriter
that could ease life :

When the = table is flushed into the disk, it could be interesting to
have = listeners that could be triggered to perform any action (copying
my = SSTable into HDFS for instance).

Please have a look at the patch = below to give a better idea. Do you
think it could worth while = opening a jira for this ?


Regarding 1.1 BulkOutputFormat and = bulk in general, the work done to
have light client to stream into = the cluster is really great. The
issue now is that data is streamed = at the end of the task only. This
cause all the tasks storing the = data locally and streaming everything
at the end. Lot's of temporary = space may be needed, and lot of
bandwidth to the nodes are used at = the "same" time. With the listener,
we would be able to start = streaming as soon the first table is
created. That way the streaming = bandwidth could be better balanced.
Jira for this also = ?

Thanks

Benoit.




--- = a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.jav= a
+++ = b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.jav= a
@@ -21,6 +21,8 @@ package org.apache.cassandra.io.sstable;
= import java.io.File;
import java.io.IOException;
import = java.nio.ByteBuffer;
+import java.util.LinkedList;
+import = java.util.List;
import java.util.Map;
import = java.util.TreeMap;

@@ -47,6 +49,8 @@ public class = SSTableSimpleUnsortedWriter extends
AbstractSSTableSimpleWriter
=     private final long bufferSize;
=     private long currentSize;

+ =    private final List<SSTableWriterListener> = sSTableWrittenListeners
=3D new = LinkedList<SSTableWriterListener>();
+
=     /**
     * Create a = new buffering writer.
     * @param = directory the directory where to write the sstables
@@ -123,5 +127,16 = @@ public class SSTableSimpleUnsortedWriter = extends
AbstractSSTableSimpleWriter
=         }
=         currentSize =3D 0;
=         keys.clear();
+
+ =        // Notify the registered = listeners
+        for = (SSTableWriterListener listeners : sSTableWrittenListeners)
+ =        {
+
listeners.onSSTableWri= ttenAndClosed(writer.getTableName(),
writer.getColumnFamilyName(), = writer.getFilename());
+ =        }
+ =    }
+
+    public void = addSSTableWriterListener(SSTableWriterListener listener)
+ =    {
+ =       sSTableWrittenListeners.add(listener);=
    }
}
diff --git = a/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
b= /src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
ne= w file mode 100644
index 0000000..6628d20
--- /dev/null
+++ = b/src/java/org/apache/cassandra/io/sstable/SSTableWriterListener.java
@= @ -0,0 +1,9 @@
+package = org.apache.cassandra.io.sstable;
+
+import = java.io.IOException;
+
+public interface SSTableWriterListener = {
+
+       void = onSSTableWrittenAndClosed(final String tableName, final
String = columnFamilyName, final String filename) throws = IOException;
+
+}

= --Apple-Mail=_0FB92A32-723F-4263-A2E6-A730359FFF2A--