cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aaron morton <aa...@thelastpickle.com>
Subject Re: ColumnFamilyOutputFormat problem
Date Wed, 10 Aug 2011 21:40:04 GMT
>  Seems the data are not actually written to Cassandra.

Before jumping into the Hadoop side of things are you saying there is no data in Cassandra
? Can you retrieve any using the CLI  ? Take a look at cfstats on each node to see the estimated
record count.

Cheers
 
-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton
http://www.thelastpickle.com

On 11 Aug 2011, at 08:20, Jian Fang wrote:

> Hi,
> 
> I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file and then write
about 2.5 million records
> to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My Cassandra cluster
has three nodes with
> one Hadoop task tracker on each node. The wired problem is that I only saw one map and
one reducer tasks and job only took
> 53 seconds to finish. Seems the data are not actually written to Cassandra.
> 
> Here is status from Hadoop web admin:
> 
> User: hadoop
> Job Name: SalesRankWriter
> Job File: hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml
> Job Setup: Successful
> Status: Succeeded
> Started at: Wed Aug 10 15:24:43 EDT 2011
> Finished at: Wed Aug 10 15:25:36 EDT 2011
> Finished in: 52sec
> Job Cleanup: Successful
> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed
> Task Attempts
> map	100.00%
> 1	0	0	1	0	0 / 0
> reduce	100.00%
> 1	0	0	1	0	0 / 0
> 
> Counter	Map	Reduce	Total
> Job Counters	Launched reduce tasks	0	0	1
> Launched map tasks	0	0	1
> Data-local map tasks	0	0	1
> FileSystemCounters	FILE_BYTES_READ	50,698,700	50,698,646	101,397,346
> HDFS_BYTES_READ	56,149,360	0	56,149,360
> FILE_BYTES_WRITTEN	101,397,378	50,698,646	152,096,024
> Map-Reduce Framework	Reduce input groups	0	2,534,932	2,534,932
> Combine output records	0	0	0
> Map input records	2,534,932	0	2,534,932
> Reduce shuffle bytes	0	0	0
> Reduce output records	0	2,534,932	2,534,932
> Spilled Records	5,069,864	2,534,932	7,604,796
> Map output bytes	45,628,776	0	45,628,776
> Map output records	2,534,932	0	2,534,932
> Combine input records	0	0	0
> Reduce input records	0	2,534,932	2,534,932
> 
> and the log for the mapper
> 
>  2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing
JVM Metrics with processName=MAP, sessionId=
> 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100
> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720
> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680
> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend =
4718592; bufvoid = 99614720
> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; kvend = 262144;
length = 327680
> 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished spill 0
> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart = 4718592; bufend
= 9437166; bufvoid = 99614720
> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart = 262144; kvend
= 196607; length = 327680
> 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished spill 1
> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = 9437166; bufend
= 14155740; bufvoid = 99614720
> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart = 196607; kvend
= 131070; length = 327680
> 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished spill 2
> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = 14155740; bufend
= 18874314; bufvoid = 99614720
> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart = 131070; kvend
= 65533; length = 327680
> 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished spill 3
> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = 18874314; bufend
= 23592906; bufvoid = 99614720
> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart = 65533; kvend
= 327677; length = 327680
> 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished spill 4
> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = 23592906; bufend
= 28311480; bufvoid = 99614720
> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart = 327677; kvend
= 262140; length = 327680
> 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished spill 5
> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = 28311480; bufend
= 33030054; bufvoid = 99614720
> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart = 262140; kvend
= 196603; length = 327680
> 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished spill 6
> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = 33030054; bufend
= 37748628; bufvoid = 99614720
> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart = 196603; kvend
= 131066; length = 327680
> 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished spill 7
> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record
full = true
> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = 37748628; bufend
= 42467202; bufvoid = 99614720
> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart = 131066; kvend
= 65529; length = 327680
> 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished spill 8
> 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map
output
> 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished spill 9
> 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 sorted segments
> 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass,
with 10 segments left of total size: 50698660 bytes
> 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: Task:attempt_201108051329_0060_m_000000_0
is done. And is in the process of commiting
> 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task 'attempt_201108051329_0060_m_000000_0'
done.
> 
> and the log for the reducer
> 
> 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM
Metrics with processName=SHUFFLE, sessionId=
> 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager:
MemoryLimit=140699232, MaxSingleShuffleLimit=35174808
> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread started: Thread for merging on-disk files
> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread started: Thread for merging in memory files
> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread waiting: Thread for merging on-disk files
> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Need another 1 map output(s) where 0 is already in progress
> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread started: Thread for polling Map Completion Events
> 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Scheduled 0 outputs (0 slow hosts and0 dup hosts)
> 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0:
Got 1 new map-outputs
> 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Scheduled 1 outputs (0 slow hosts and0 dup hosts)
> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: header: attempt_201108051329_0060_m_000000_0,
compressed len: 50698646, decompressed len: 50698642
> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 50698642
bytes (50698646 raw bytes) into Local-FS from attempt_201108051329_0060_m_000000_0
> 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read 50698646 bytes
from map-output for attempt_201108051329_0060_m_000000_0
> 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread waiting: Thread for merging on-disk files
> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread
exiting
> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread
joined.
> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk
merge complete: 1 files left.
> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete:
0 files left.
> 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 files, 50698646
bytes from disk
> 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments,
0 bytes from memory into reduce
> 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments
> 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass,
with 1 segments left of total size: 50698642 bytes
> 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: Task:attempt_201108051329_0060_r_000000_0
is done. And is in the process of commiting
> 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task 'attempt_201108051329_0060_r_000000_0'
done.
> 
> My code is similar to the word count example:
> 
>     public int run(String[] args) throws Exception {
>         ...
> 
>         getConf().set(CONF_COLUMN_NAME, columnName);
> 
>         Job job4 = new Job(getConf(), "SalesRankWriter");
>         job4.setJarByClass(SalesRankLoader.class);
>         job4.setMapperClass(RankUpdateMapper.class);
>         job4.setReducerClass(RankUpdateReducer.class);
>         job4.setMapOutputKeyClass(Text.class);
>         job4.setMapOutputValueClass(IntWritable.class);
>         job4.setOutputKeyClass(ByteBuffer.class);
>         job4.setOutputValueClass(List.class);
>         job4.setOutputFormatClass(ColumnFamilyOutputFormat.class);
>         job4.setInputFormatClass(TextInputFormat.class);
>         FileInputFormat.addInputPath(job4, new Path(prePath));
> 
>         ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), KEYSPACE, columnFamily);
>         ConfigHelper.setRpcPort(job4.getConfiguration(), "9260");
>         ConfigHelper.setInitialAddress(job4.getConfiguration(), "dnjsrcha01");
>         ConfigHelper.setPartitioner(job4.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
> 
> 
>         job4.waitForCompletion(true);
>         ...
>     }
> 
> 
> where the mapper and reducer are defined as:
> 
>     public static class RankUpdateMapper extends Mapper<LongWritable, Text, Text,
IntWritable> {
>         public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
>             String line = value.toString();
>             StringTokenizer tokenizer = new StringTokenizer(line);
>             String ean = tokenizer.nextToken();
>             int rank = Integer.parseInt(tokenizer.nextToken());
> 
>             context.write(new Text(ean), new IntWritable(rank));
>         }
>     }
> 
>     public static class RankUpdateReducer extends Reducer<Text, IntWritable, ByteBuffer,
List<Mutation>>
>     {
>         private ByteBuffer outputKey;
> 
>         protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) throws
IOException, InterruptedException
>         {
>             outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
>         }
> 
>         public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
>         {
>             context.write(outputKey, Collections.singletonList(getMutation(key, values.iterator().next().get())));
>         }
> 
>         private static Mutation getMutation(Text key, int value)
>         {
>             Column c = new Column();
>             c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), key.getLength())));
>             c.setValue(ByteBufferUtil.bytes(String.valueOf(value)));
>             c.setTimestamp(System.currentTimeMillis() * 1000);
> 
>             Mutation m = new Mutation();
>             m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
>             m.column_or_supercolumn.setColumn(c);
>             return m;
>         }
>     }
> 
> Any thing wrong here?
> 
> Thanks,
> 
> John


Mime
View raw message