incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aaron morton <aa...@thelastpickle.com>
Subject Re: ColumnFamilyOutputFormat problem
Date Tue, 16 Aug 2011 23:25:15 GMT
I suggested turning up the logging to see if the server processed a batch_mutate call. This
is done from the CassandraServer class (https://github.com/apache/cassandra/blob/cassandra-0.8.4/src/java/org/apache/cassandra/thrift/CassandraServer.java#L531)
, not the CFOF.

The first step will be to determine if a call is been made to the server. If not then work
out why not. If so then were is the data going. 

Cheers

-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton
http://www.thelastpickle.com

On 17/08/2011, at 1:36 AM, Jian Fang wrote:

> If you look at the source code and you will find there is no log message in the ColumnFamilyOutputFormat
class and the related classes.
> How to trace the problem then? No one actually got this working?
> 
> On Thu, Aug 11, 2011 at 6:10 PM, aaron morton <aaron@thelastpickle.com> wrote:
> Turn the logging up in cassandra or your MR job and make sure the batch_mutation is sent.
Sounds like it's not. 
> 
> Cheers
> 
> -----------------
> Aaron Morton
> Freelance Cassandra Developer
> @aaronmorton
> http://www.thelastpickle.com
> 
> On 12 Aug 2011, at 07:22, Jian Fang wrote:
> 
>> 53 seconds included the map phase to read and process the input file. The records
were updated at the end of the reduce phase.
>> 
>> I checked the sales ranks in the update file and the sales ranks in the Cassandra,
they are different and thus, the records
>> were not actually updated.
>> 
>> I remember I run the word count example for Cassandra 0.8.0 some time ago, I saw
the similar behavior, i.e., the results were not written to the Cassandra column family. But
If I changed hadoop to write to the file system, I can see the results in the output file.
I didn't try the word count example for Cassandra 0.8.2 though.
>> 
>> Anyway to solve this problem?
>> 
>> Thanks,
>> 
>> John 
>> On Thu, Aug 11, 2011 at 12:17 AM, aaron morton <aaron@thelastpickle.com> wrote:
>> I'm a simple guy. My first step would be see if the expected data is in the data
base, if not what's missing. 
>> 
>> 2.5M updates / 3 nodes = 833,333 per node 
>> 833,333 / 53 seconds = 15,723 per second 
>> 1 / 15,723  = 0.00006 seconds / 0.06 milliseconds per mutation 
>>  
>> sounds reasonable to me. 
>> 
>> check the Write Latency in nodetool cfstats and the row count estimates. 
>> 
>> Cheers
>> 
>> -----------------
>> Aaron Morton
>> Freelance Cassandra Developer
>> @aaronmorton
>> http://www.thelastpickle.com
>> 
>> On 11 Aug 2011, at 14:50, Jian Fang wrote:
>> 
>>> There are data and each Cassandra cluster node holds about 100G. From the application
point of view, if I run the job twice with the same input file, i.e., the sales rank update
file, then I should see a much smaller number of products, whose rank change exceeds the threshold,
in the output file for the second run because the sales ranks have been updated to be the
same as the ranks in the input file during the first run. But actually, I saw the output files
stay the same for the two runs. One explanation is that the ranks were not actually updated
for the first run. Also, 53 seconds to run the whole hadoop job with 2.5 million Cassandra
updates on three nodes, do you think that is possible? Each node is a regular Linux box with
8 CPUs. 
>>> 
>>> Thanks,
>>> 
>>> John  
>>> 
>>> On Wed, Aug 10, 2011 at 5:40 PM, aaron morton <aaron@thelastpickle.com>
wrote:
>>>>  Seems the data are not actually written to Cassandra.
>>> 
>>> Before jumping into the Hadoop side of things are you saying there is no data
in Cassandra ? Can you retrieve any using the CLI  ? Take a look at cfstats on each node to
see the estimated record count.
>>> 
>>> Cheers
>>>  
>>> -----------------
>>> Aaron Morton
>>> Freelance Cassandra Developer
>>> @aaronmorton
>>> http://www.thelastpickle.com
>>> 
>>> On 11 Aug 2011, at 08:20, Jian Fang wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file
and then write about 2.5 million records
>>>> to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My Cassandra
cluster has three nodes with
>>>> one Hadoop task tracker on each node. The wired problem is that I only saw
one map and one reducer tasks and job only took
>>>> 53 seconds to finish. Seems the data are not actually written to Cassandra.
>>>> 
>>>> Here is status from Hadoop web admin:
>>>> 
>>>> User: hadoop
>>>> Job Name: SalesRankWriter
>>>> Job File: hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml
>>>> Job Setup: Successful
>>>> Status: Succeeded
>>>> Started at: Wed Aug 10 15:24:43 EDT 2011
>>>> Finished at: Wed Aug 10 15:25:36 EDT 2011
>>>> Finished in: 52sec
>>>> Job Cleanup: Successful
>>>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed
>>>> Task Attempts
>>>> map	100.00%
>>>> 1	0	0	1	0	0 / 0
>>>> reduce	100.00%
>>>> 1	0	0	1	0	0 / 0
>>>> 
>>>> Counter	Map	Reduce	Total
>>>> Job Counters	Launched reduce tasks	0	0	1
>>>> Launched map tasks	0	0	1
>>>> Data-local map tasks	0	0	1
>>>> FileSystemCounters	FILE_BYTES_READ	50,698,700	50,698,646	101,397,346
>>>> HDFS_BYTES_READ	56,149,360	0	56,149,360
>>>> FILE_BYTES_WRITTEN	101,397,378	50,698,646	152,096,024
>>>> Map-Reduce Framework	Reduce input groups	0	2,534,932	2,534,932
>>>> Combine output records	0	0	0
>>>> Map input records	2,534,932	0	2,534,932
>>>> Reduce shuffle bytes	0	0	0
>>>> Reduce output records	0	2,534,932	2,534,932
>>>> Spilled Records	5,069,864	2,534,932	7,604,796
>>>> Map output bytes	45,628,776	0	45,628,776
>>>> Map output records	2,534,932	0	2,534,932
>>>> Combine input records	0	0	0
>>>> Reduce input records	0	2,534,932	2,534,932
>>>> 
>>>> and the log for the mapper
>>>> 
>>>>  2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing
JVM Metrics with processName=MAP, sessionId=
>>>> 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb
= 100
>>>> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data buffer
= 79691776/99614720
>>>> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record buffer
= 262144/327680
>>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart =
0; bufend = 4718592; bufvoid = 99614720
>>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart =
0; kvend = 262144; length = 327680
>>>> 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished spill
0
>>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart =
4718592; bufend = 9437166; bufvoid = 99614720
>>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart =
262144; kvend = 196607; length = 327680
>>>> 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished spill
1
>>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart =
9437166; bufend = 14155740; bufvoid = 99614720
>>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart =
196607; kvend = 131070; length = 327680
>>>> 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished spill
2
>>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart =
14155740; bufend = 18874314; bufvoid = 99614720
>>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart =
131070; kvend = 65533; length = 327680
>>>> 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished spill
3
>>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart =
18874314; bufend = 23592906; bufvoid = 99614720
>>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart =
65533; kvend = 327677; length = 327680
>>>> 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished spill
4
>>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart =
23592906; bufend = 28311480; bufvoid = 99614720
>>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart =
327677; kvend = 262140; length = 327680
>>>> 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished spill
5
>>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart =
28311480; bufend = 33030054; bufvoid = 99614720
>>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart =
262140; kvend = 196603; length = 327680
>>>> 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished spill
6
>>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart =
33030054; bufend = 37748628; bufvoid = 99614720
>>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart =
196603; kvend = 131066; length = 327680
>>>> 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished spill
7
>>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling map
output: record full = true
>>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart =
37748628; bufend = 42467202; bufvoid = 99614720
>>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart =
131066; kvend = 65529; length = 327680
>>>> 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished spill
8
>>>> 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting flush
of map output
>>>> 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished spill
9
>>>> 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10
sorted segments
>>>> 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to the
last merge-pass, with 10 segments left of total size: 50698660 bytes
>>>> 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: Task:attempt_201108051329_0060_m_000000_0
is done. And is in the process of commiting
>>>> 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task 'attempt_201108051329_0060_m_000000_0'
done.
>>>> 
>>>> and the log for the reducer
>>>> 
>>>> 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing
JVM Metrics with processName=SHUFFLE, sessionId=
>>>> 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager:
MemoryLimit=140699232, MaxSingleShuffleLimit=35174808
>>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread started: Thread for merging on-disk files
>>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread started: Thread for merging in memory files
>>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread waiting: Thread for merging on-disk files
>>>> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Need another 1 map output(s) where 0 is already in progress
>>>> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread started: Thread for polling Map Completion Events
>>>> 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Scheduled 0 outputs (0 slow hosts and0 dup hosts)
>>>> 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0:
Got 1 new map-outputs
>>>> 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Scheduled 1 outputs (0 slow hosts and0 dup hosts)
>>>> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: header:
attempt_201108051329_0060_m_000000_0, compressed len: 50698646, decompressed len: 50698642
>>>> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling
50698642 bytes (50698646 raw bytes) into Local-FS from attempt_201108051329_0060_m_000000_0
>>>> 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read 50698646
bytes from map-output for attempt_201108051329_0060_m_000000_0
>>>> 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread waiting: Thread for merging on-disk files
>>>> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread
exiting
>>>> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread
joined.
>>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed
ram manager
>>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved
on-disk merge complete: 1 files left.
>>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: In-memory
merge complete: 0 files left.
>>>> 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging
1 files, 50698646 bytes from disk
>>>> 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: Merging
0 segments, 0 bytes from memory into reduce
>>>> 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted
segments
>>>> 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to the
last merge-pass, with 1 segments left of total size: 50698642 bytes
>>>> 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: Task:attempt_201108051329_0060_r_000000_0
is done. And is in the process of commiting
>>>> 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task 'attempt_201108051329_0060_r_000000_0'
done.
>>>> 
>>>> My code is similar to the word count example:
>>>> 
>>>>     public int run(String[] args) throws Exception {
>>>>         ...
>>>> 
>>>>         getConf().set(CONF_COLUMN_NAME, columnName);
>>>> 
>>>>         Job job4 = new Job(getConf(), "SalesRankWriter");
>>>>         job4.setJarByClass(SalesRankLoader.class);
>>>>         job4.setMapperClass(RankUpdateMapper.class);
>>>>         job4.setReducerClass(RankUpdateReducer.class);
>>>>         job4.setMapOutputKeyClass(Text.class);
>>>>         job4.setMapOutputValueClass(IntWritable.class);
>>>>         job4.setOutputKeyClass(ByteBuffer.class);
>>>>         job4.setOutputValueClass(List.class);
>>>>         job4.setOutputFormatClass(ColumnFamilyOutputFormat.class);
>>>>         job4.setInputFormatClass(TextInputFormat.class);
>>>>         FileInputFormat.addInputPath(job4, new Path(prePath));
>>>> 
>>>>         ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), KEYSPACE,
columnFamily);
>>>>         ConfigHelper.setRpcPort(job4.getConfiguration(), "9260");
>>>>         ConfigHelper.setInitialAddress(job4.getConfiguration(), "dnjsrcha01");
>>>>         ConfigHelper.setPartitioner(job4.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
>>>> 
>>>> 
>>>>         job4.waitForCompletion(true);
>>>>         ...
>>>>     }
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> where the mapper and reducer are defined as:
>>>> 
>>>>     public static class RankUpdateMapper extends Mapper<LongWritable,
Text, Text, IntWritable> {
>>>>         public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
>>>>             String line = value.toString();
>>>>             StringTokenizer tokenizer = new StringTokenizer(line);
>>>>             String ean = tokenizer.nextToken();
>>>>             int rank = Integer.parseInt(tokenizer.nextToken());
>>>> 
>>>>             context.write(new Text(ean), new IntWritable(rank));
>>>>         }
>>>>     }
>>>> 
>>>>     public static class RankUpdateReducer extends Reducer<Text, IntWritable,
ByteBuffer, List<Mutation>>
>>>>     {
>>>>         private ByteBuffer outputKey;
>>>> 
>>>>         protected void setup(org.apache.hadoop.mapreduce.Reducer.Context
context) throws IOException, InterruptedException
>>>>         {
>>>>             outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
>>>>         }
>>>> 
>>>>         public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException
>>>>         {
>>>>             context.write(outputKey, Collections.singletonList(getMutation(key,
values.iterator().next().get())));
>>>>         }
>>>> 
>>>>         private static Mutation getMutation(Text key, int value)
>>>>         {
>>>>             Column c = new Column();
>>>>             c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), key.getLength())));
>>>>             c.setValue(ByteBufferUtil.bytes(String.valueOf(value)));
>>>>             c.setTimestamp(System.currentTimeMillis() * 1000);
>>>> 
>>>>             Mutation m = new Mutation();
>>>>             m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
>>>>             m.column_or_supercolumn.setColumn(c);
>>>>             return m;
>>>>         }
>>>>     }
>>>> 
>>>> Any thing wrong here?
>>>> 
>>>> Thanks,
>>>> 
>>>> John
>>> 
>>> 
>> 
>> 
> 
> 


Mime
View raw message