incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Filippo Diotalevi <fili...@ntoklo.com>
Subject Re: cassandra-hadoop mapper
Date Thu, 31 May 2012 09:20:20 GMT
Hi,  
yes, the work can be split between different mappers, but each one will process one row at
the time. In fact, the method

>  public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context
context)

processes 1 row, with the specified ByteBuffer key and the list of columns SortedMap<ByteBuffer,
IColumn> columns.


That doesn't mean you will make millions of requests to Cassandra to retrieve one row at the
time though. Requests are batched, and the parameter
cassandra.range.batch.size
determines "The number of rows to request with each get range slices request" (as per javadoc).

Performance-wise, that shouldn't be a problem… the operation you are doing is very simple,
and Cassandra will be fast to retrieve such a short rows.  
In any case, your business logic works well in parallel, so you can split the job between
many concurrent mappers and distribute the work among them.

--  
Filippo



On Thursday, 31 May 2012 at 09:59, murat migdisoglu wrote:

>  
> Hi,  
>  
> I'm working on some use cases to understand how cassandra-hadoop integration works. 

>  
> I have a very basic scenario: I have a column family that keeps the session id and some
bson data that contains the username in two separate columns. I want to go through all rows
and dump the row to a file when the username is matching to a certain criteria. And I don't
need any Reducer or Combiner for now.  
>  
> After I've written the following very simple hadoop job, I see from the logs that my
mapper function is called per each row.  Is that normal? If that is the case, doing such a
search operation in a big dataset would take hours if not days...Besides that, I see many
small output files being created on HDFS.  
>  
> I guess i need a better understanding on how splitting the job into tasks works exactly..
 
>  
>  
>     @Override
>     public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context
context)
>     throws IOException, InterruptedException
>     {
>         String rowkey = ByteBufferUtil.string(key);
>         String ip = context.getConfiguration().
> get(IP);
>         IColumn column = columns.get(sourceColumn);
>         if (column == null)
>             return;
>         ByteBuffer byteBuffer = column.value();         
>         ByteBuffer bb2 = byteBuffer.duplicate();
>          
>         DataConvertor convertor= fromBson(byteBuffer, DataConvertor.class);         
>         String username= convertor.getUsername();
>         BytesWritable value = new BytesWritable();
>         if (username != null && username.equals(cip)) {        
>             byte[] arr = convertToByteArray(bb2);
>             value.set(new BytesWritable(arr));
>             Text tkey = new Text(rowkey);
>             context.write( tkey, value);
>         } else {
>             log.info (http://log.info)("ip not match [" + ip + "]");
>         }
>     }
>  
> Thanks in advance
> Kind Regards
>  
>  
> --  
> "Find a job you enjoy, and you'll never work a day in your life."
> Confucius  
>  


Mime
View raw message