cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Karthick Sankarachary (JIRA)" <>
Subject [jira] Commented: (CASSANDRA-1101) A Hadoop Output Format That Targets Cassandra
Date Fri, 04 Jun 2010 22:30:55 GMT


Karthick Sankarachary commented on CASSANDRA-1101:

   {quote}* ColumnWritable implements byte[] comparison, but should use o.a.c.utils.FBUtilities.compareByteArrays{quote}
That totally makes sense.
   {quote}* ColumnWritable implements equality in terms of reference equality: is that intentional?{quote}
To quote from Comparable#compareTo's Javadoc: 'It is strongly recommended, but not strictly
required that (x.compareTo(y)==0) == (x.equals(y)). Generally speaking, any class that implements
the Comparable interface and violates this condition should clearly indicate this fact. The
recommended language is "Note: this class has a natural ordering that is inconsistent with
equals." '
   {quote}* ColumnWritable assumes both name and value can be converted to Strings, which
is not safe: use FBUtilities.(write|read)ByteArray instead{quote}
   {quote}* Why add EmbeddedServer rather than using EmbeddedCassandraService?{quote}
There was a two-fold reason why I chose to write EmbeddedServer: (a) The EmbeddedCassandraService
appears to be locked into the Thrift CassandraDaemon, whereas EmbeddedServer works for both
the Thrift and Avro (and hopefully any future remoting framework), (b) The EmbeddedCassandraService
does not follow the exact same lifecycle as is prescribed in the the main method of the CassandraDaemon,
which the EmbeddedServer does (to the letter). In fact, I believe that we should try and define
an abstraction for the CassandraDaemon (as I've attempted to do in CASSANDRA-1131), because
it'll make it easier for tools and test frameworks to launch the Cassandra service in a transport-agnostic

The latest patch incorporates the changes described above, and includes your patch to boot.

> A Hadoop Output Format That Targets Cassandra
> ---------------------------------------------
>                 Key: CASSANDRA-1101
>                 URL:
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Hadoop
>            Reporter: Karthick Sankarachary
>            Assignee: Stu Hood
>             Fix For: 0.7
>         Attachments: 1101-clock-fix.diff, CASSANDRA-1101-V1.patch, CASSANDRA-1101-V2.patch,
CASSANDRA-1101-V3.patch, CASSANDRA-1101-V4.patch, CASSANDRA-1101.patch
> Currently, there exists a Hadoop-specific input format (viz., ColumnFamilyInputFormat)
that allows one to iterate over the rows in a given Cassandra column family and treat it as
the input to a Hadoop map task. By the same token, one may need to feed the output of a Hadoop
reduce task into a Cassandra column family, for which no mechanism exists today. This calls
for the definition of a Hadoop-specific output format which accepts a pair of key and columns,
and writes it out to a given column family.
> Here, we describe an output format known as ColumnFamilyOutputFormat, which allows reduce
tasks to persist keys and their associated columns as Cassandra rows in a given column family.
 By default, it prevents overwriting existing rows in the column family, by ensuring at initialization
time that it contains no rows in the given slice predicate. For the sake of speed, it employs
a lazy write-back caching mechanism, where its record writer batches mutations created based
on the reduce's inputs (in a task-specific map) but stops short of actually mutating the rows.
The latter responsibility falls on its output committer, which makes the changes official
by sending a batch mutate request to Cassandra.  
> The record writer, which is called ColumnFamilyRecordWriter, maps the input <key,
value> pairs to a Cassandra column family. In particular, it creates mutations for each
column in the value, which it then associates with the key, and in turn the responsible endpoint.
 Note that, given that round trips to the server are fairly expensive, it merely batches the
mutations in-memory, and leaves it on the output committer to send the batched mutations to
the server.  Furthermore, the writer groups the mutations by the endpoint responsible for
the rows being affected. This allows the output committer to execute the mutations in parallel,
on an endpoint-by-endpoint basis.
> The output committer, which is called ColumnFamilyOutputCommitter, traverses the mutations
collected by the record writer, and sends them to the endpoints responsible for them. Since
the total set of mutations is partitioned by their endpoints, each of which can be performed
in parallel, it allows us to commit the mutations using multiple threads, one per endpoint.
As a result, it reduces the time it takes to propagate the mutations to the server considering
that (a) the client eliminates one network hop that the server would otherwise have had to
make and (b) each endpoint node has to deal with but a sub-set of the total set of mutations.
> For convenience, we also define a default reduce task, called ColumnFamilyOutputReducer,
which collects the columns in the input value and maps them to a data structure expected by
Cassandra. By default, it assumes the input value to be in the form of a ColumnWritable, which
denotes a name value pair corresponding to a certain column. This reduce task is in turn used
by the attached test case, which maps every <key, value> pair in a sample input sequence
file to a <key, column> pair, and then reduces them by aggregating columns corresponding
to the same key. Eventually, the batched <key, columns> pairs are written to the column
family associated with the output format.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message