Return-Path: X-Original-To: apmail-cassandra-user-archive@www.apache.org Delivered-To: apmail-cassandra-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 650B68FC1 for ; Wed, 10 Aug 2011 21:40:47 +0000 (UTC) Received: (qmail 67616 invoked by uid 500); 10 Aug 2011 21:40:44 -0000 Delivered-To: apmail-cassandra-user-archive@cassandra.apache.org Received: (qmail 67543 invoked by uid 500); 10 Aug 2011 21:40:44 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 67534 invoked by uid 99); 10 Aug 2011 21:40:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Aug 2011 21:40:43 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy) Received: from [208.113.200.5] (HELO homiemail-a42.g.dreamhost.com) (208.113.200.5) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Aug 2011 21:40:34 +0000 Received: from homiemail-a42.g.dreamhost.com (localhost [127.0.0.1]) by homiemail-a42.g.dreamhost.com (Postfix) with ESMTP id D985568C05D for ; Wed, 10 Aug 2011 14:40:12 -0700 (PDT) DomainKey-Signature: a=rsa-sha1; c=nofws; d=thelastpickle.com; h=from :mime-version:content-type:subject:date:in-reply-to:to :references:message-id; q=dns; s=thelastpickle.com; b=J+X3ZcuUWq ZB1+RSEcbGp3ZHay6klkyUwj2//H6WfNBMHV6v/yIgrN4xe/Re8oQUDWRQx1Q/vq ArreeE+1coVMsWwkgqogkZ7WOeMZWCOW/efRtsztVWzFM6Ex4p5oUtb//3azA5RK Rbg+HtYvlo8Cy9Ldmg3F+PjOcVme+u/mc= DKIM-Signature: v=1; a=rsa-sha1; c=relaxed; d=thelastpickle.com; h=from :mime-version:content-type:subject:date:in-reply-to:to :references:message-id; s=thelastpickle.com; bh=iL0jcGAXamwOhnjw voYBJ7tc+AI=; b=A0kkm0wN6mlY28cm0j/w+XrnzR3YEvY94RXqkA3K/jVx4qoL H2U8dIKjkQQCkjMKMJB+jmBkNYumotQTr5nZJ3QucXndrDyQI7kX6jKa78KCq3ks AkOB6P6zB0FCLUZrM4zIKlVDsvqXNC8Xjka2WKVm8DGafmI9cagFvXcjX/4= Received: from 202-126-206-202.vectorcommunications.net.nz (unknown [202.126.206.202]) (using TLSv1 with cipher AES128-SHA (128/128 bits)) (No client certificate requested) (Authenticated sender: aaron@thelastpickle.com) by homiemail-a42.g.dreamhost.com (Postfix) with ESMTPSA id 7EB9768C05F for ; Wed, 10 Aug 2011 14:40:08 -0700 (PDT) From: aaron morton Mime-Version: 1.0 (Apple Message framework v1244.3) Content-Type: multipart/alternative; boundary="Apple-Mail=_595EBDDA-4BD5-491C-B45D-86254C69A7F0" Subject: Re: ColumnFamilyOutputFormat problem Date: Thu, 11 Aug 2011 09:40:04 +1200 In-Reply-To: To: user@cassandra.apache.org References: Message-Id: <375D0D7F-933B-4F68-99F0-73B9CBEB8E65@thelastpickle.com> X-Mailer: Apple Mail (2.1244.3) X-Virus-Checked: Checked by ClamAV on apache.org --Apple-Mail=_595EBDDA-4BD5-491C-B45D-86254C69A7F0 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=iso-8859-1 > Seems the data are not actually written to Cassandra. Before jumping into the Hadoop side of things are you saying there is no = data in Cassandra ? Can you retrieve any using the CLI ? Take a look at = cfstats on each node to see the estimated record count. Cheers =20 ----------------- Aaron Morton Freelance Cassandra Developer @aaronmorton http://www.thelastpickle.com On 11 Aug 2011, at 08:20, Jian Fang wrote: > Hi, >=20 > I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a = file and then write about 2.5 million records > to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. = My Cassandra cluster has three nodes with > one Hadoop task tracker on each node. The wired problem is that I only = saw one map and one reducer tasks and job only took > 53 seconds to finish. Seems the data are not actually written to = Cassandra. >=20 > Here is status from Hadoop web admin: >=20 > User: hadoop > Job Name: SalesRankWriter > Job File: = hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml > Job Setup: Successful > Status: Succeeded > Started at: Wed Aug 10 15:24:43 EDT 2011 > Finished at: Wed Aug 10 15:25:36 EDT 2011 > Finished in: 52sec > Job Cleanup: Successful > Kind % Complete Num Tasks Pending Running Complete = Killed Failed/Killed > Task Attempts > map 100.00% > 1 0 0 1 0 0 / 0 > reduce 100.00% > 1 0 0 1 0 0 / 0 >=20 > Counter Map Reduce Total > Job Counters Launched reduce tasks 0 0 1 > Launched map tasks 0 0 1 > Data-local map tasks 0 0 1 > FileSystemCounters FILE_BYTES_READ 50,698,700 50,698,646 = 101,397,346 > HDFS_BYTES_READ 56,149,360 0 56,149,360 > FILE_BYTES_WRITTEN 101,397,378 50,698,646 152,096,024 > Map-Reduce Framework Reduce input groups 0 2,534,932 = 2,534,932 > Combine output records 0 0 0 > Map input records 2,534,932 0 2,534,932 > Reduce shuffle bytes 0 0 0 > Reduce output records 0 2,534,932 2,534,932 > Spilled Records 5,069,864 2,534,932 7,604,796 > Map output bytes 45,628,776 0 45,628,776 > Map output records 2,534,932 0 2,534,932 > Combine input records 0 0 0 > Reduce input records 0 2,534,932 2,534,932 >=20 > and the log for the mapper >=20 > 2011-08-10 15:24:48,717 INFO = org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with = processName=3DMAP, sessionId=3D > 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: = io.sort.mb =3D 100 > 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data = buffer =3D 79691776/99614720 > 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record = buffer =3D 262144/327680 > 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 0; bufend =3D 4718592; bufvoid =3D 99614720 > 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 0; kvend =3D 262144; length =3D 327680 > 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 0 > 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 4718592; bufend =3D 9437166; bufvoid =3D 99614720 > 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 262144; kvend =3D 196607; length =3D 327680 > 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 1 > 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 9437166; bufend =3D 14155740; bufvoid =3D 99614720 > 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 196607; kvend =3D 131070; length =3D 327680 > 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 2 > 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 14155740; bufend =3D 18874314; bufvoid =3D 99614720 > 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 131070; kvend =3D 65533; length =3D 327680 > 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 3 > 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 18874314; bufend =3D 23592906; bufvoid =3D 99614720 > 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 65533; kvend =3D 327677; length =3D 327680 > 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 4 > 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 23592906; bufend =3D 28311480; bufvoid =3D 99614720 > 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 327677; kvend =3D 262140; length =3D 327680 > 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 5 > 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 28311480; bufend =3D 33030054; bufvoid =3D 99614720 > 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 262140; kvend =3D 196603; length =3D 327680 > 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 6 > 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 33030054; bufend =3D 37748628; bufvoid =3D 99614720 > 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 196603; kvend =3D 131066; length =3D 327680 > 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 7 > 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: = Spilling map output: record full =3D true > 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: = bufstart =3D 37748628; bufend =3D 42467202; bufvoid =3D 99614720 > 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart = =3D 131066; kvend =3D 65529; length =3D 327680 > 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 8 > 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: = Starting flush of map output > 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: = Finished spill 9 > 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging = 10 sorted segments > 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to = the last merge-pass, with 10 segments left of total size: 50698660 bytes > 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: = Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process = of commiting > 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task = 'attempt_201108051329_0060_m_000000_0' done. >=20 > and the log for the reducer >=20 > 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: = Initializing JVM Metrics with processName=3DSHUFFLE, sessionId=3D > 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: = ShuffleRamManager: MemoryLimit=3D140699232, = MaxSingleShuffleLimit=3D35174808 > 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging = on-disk files > 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging = in memory files > 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging = on-disk files > 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Need another 1 map output(s) where = 0 is already in progress > 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling = Map Completion Events > 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Scheduled 0 outputs (0 slow hosts = and0 dup hosts) > 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs > 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts = and0 dup hosts) > 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: = header: attempt_201108051329_0060_m_000000_0, compressed len: 50698646, = decompressed len: 50698642 > 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: = Shuffling 50698642 bytes (50698646 raw bytes) into Local-FS from = attempt_201108051329_0060_m_000000_0 > 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read = 50698646 bytes from map-output for attempt_201108051329_0060_m_000000_0 > 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging = on-disk files > 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: = GetMapEventsThread exiting > 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: = getMapsEventsThread joined. > 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: = Closed ram manager > 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: = Interleaved on-disk merge complete: 1 files left. > 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: = In-memory merge complete: 0 files left. > 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: = Merging 1 files, 50698646 bytes from disk > 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: = Merging 0 segments, 0 bytes from memory into reduce > 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging = 1 sorted segments > 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to = the last merge-pass, with 1 segments left of total size: 50698642 bytes > 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: = Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process = of commiting > 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task = 'attempt_201108051329_0060_r_000000_0' done. >=20 > My code is similar to the word count example: >=20 > public int run(String[] args) throws Exception { > ... >=20 > getConf().set(CONF_COLUMN_NAME, columnName); >=20 > Job job4 =3D new Job(getConf(), "SalesRankWriter"); > job4.setJarByClass(SalesRankLoader.class); > job4.setMapperClass(RankUpdateMapper.class); > job4.setReducerClass(RankUpdateReducer.class); > job4.setMapOutputKeyClass(Text.class); > job4.setMapOutputValueClass(IntWritable.class); > job4.setOutputKeyClass(ByteBuffer.class); > job4.setOutputValueClass(List.class); > job4.setOutputFormatClass(ColumnFamilyOutputFormat.class); > job4.setInputFormatClass(TextInputFormat.class); > FileInputFormat.addInputPath(job4, new Path(prePath)); >=20 > ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), = KEYSPACE, columnFamily); > ConfigHelper.setRpcPort(job4.getConfiguration(), "9260"); > ConfigHelper.setInitialAddress(job4.getConfiguration(), = "dnjsrcha01"); > ConfigHelper.setPartitioner(job4.getConfiguration(), = "org.apache.cassandra.dht.RandomPartitioner"); >=20 >=20 > job4.waitForCompletion(true); > ... > } >=20 >=20 > where the mapper and reducer are defined as: >=20 > public static class RankUpdateMapper extends Mapper { > public void map(LongWritable key, Text value, Context context) = throws IOException, InterruptedException { > String line =3D value.toString(); > StringTokenizer tokenizer =3D new StringTokenizer(line); > String ean =3D tokenizer.nextToken(); > int rank =3D Integer.parseInt(tokenizer.nextToken()); >=20 > context.write(new Text(ean), new IntWritable(rank)); > } > } >=20 > public static class RankUpdateReducer extends Reducer> > { > private ByteBuffer outputKey; >=20 > protected void = setup(org.apache.hadoop.mapreduce.Reducer.Context context) throws = IOException, InterruptedException > { > outputKey =3D = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); > } >=20 > public void reduce(Text key, Iterable values, = Context context) throws IOException, InterruptedException > { > context.write(outputKey, = Collections.singletonList(getMutation(key, = values.iterator().next().get()))); > } >=20 > private static Mutation getMutation(Text key, int value) > { > Column c =3D new Column(); > c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), = key.getLength()))); > c.setValue(ByteBufferUtil.bytes(String.valueOf(value))); > c.setTimestamp(System.currentTimeMillis() * 1000); >=20 > Mutation m =3D new Mutation(); > m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); > m.column_or_supercolumn.setColumn(c); > return m; > } > } >=20 > Any thing wrong here? >=20 > Thanks, >=20 > John --Apple-Mail=_595EBDDA-4BD5-491C-B45D-86254C69A7F0 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=iso-8859-1
 Seems the data are = not actually written to = Cassandra.

Before jumping into the = Hadoop side of things are you saying there is no data in Cassandra ? Can = you retrieve any using the CLI  ? Take a look at cfstats on each = node to see the estimated record = count.

Cheers
 
http://www.thelastpickle.com

On 11 Aug 2011, at 08:20, Jian Fang wrote:

Hi,

I am using Cassandra 0.8.2 with = Hadoop 0.20.2. My application read a file and then write about 2.5 = million records
to Cassandra. I = used ColumnFamilyOutputFormat to write to Cassandra. My Cassandra = cluster has three nodes with
one Hadoop task tracker on each node. The wired problem is that I = only saw one map and one reducer tasks and job only took
53 = seconds to finish. Seems the data are not actually written to = Cassandra.

Here is status from Hadoop web = admin:

User: hadoop
Job Name: = SalesRankWriter
Job Setup: Successful
Status: Succeeded
Started = at: Wed Aug 10 15:24:43 EDT 2011
Finished at: Wed Aug 10 = 15:25:36 EDT 2011
Finished in: 52sec
Job Cleanup: = Successful
Kind = % Complete= Num Tasks Pending = Running = Complete = Killed = Failed/Killed
Task Attempts
map 100.00%
1 0 0 1 0 0 / = 0
reduce = 100.00%
1 0 0 1 0 0 / 0

Counter Map Reduce = Total
Job Counters= Launched reduce tasks 0 0 1
Launched map tasks 0 0 1
Data-local map = tasks = 0 = 0 = 1
FileSystemCounters FILE_BYTES_READ = 50,698,700= 50,698,646 101,397,346
HDFS_BYTES_READ 56,149,360 0 = 56,149,360
FILE_BYTES_WRITTEN 101,397,378 = 50,698,646= 152,096,024
Map-Reduce Framework Reduce input groups 0 = 2,534,932 = 2,534,932
Combine output records 0 0 0
Map input records= 2,534,932 0 2,534,932
Reduce shuffle bytes 0 0 0
Reduce output records 0 2,534,932 = 2,534,932
Spilled Records 5,069,864 = 2,534,932 = 7,604,796
Map output bytes 45,628,776 0 = 45,628,776
Map output records 2,534,932 0 = 2,534,932
Combine input records 0 0 0
Reduce input records 0 2,534,932 = 2,534,932

and the log for the = mapper

 2011-08-10 = 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb =3D 100 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data = buffer =3D 79691776/99614720 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record = buffer =3D 262144/327680 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 0; bufend =3D 4718592; bufvoid =3D 99614720 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 0; kvend =3D 262144; length =3D 327680 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 0 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 4718592; bufend =3D 9437166; bufvoid =3D 99614720 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 262144; kvend =3D 196607; length =3D 327680 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 1 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 9437166; bufend =3D 14155740; bufvoid =3D 99614720 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 196607; kvend =3D 131070; length =3D 327680 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 2 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 14155740; bufend =3D 18874314; bufvoid =3D 99614720 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 131070; kvend =3D 65533; length =3D 327680 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 3 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 18874314; bufend =3D 23592906; bufvoid =3D 99614720 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 65533; kvend =3D 327677; length =3D 327680 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 4 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 23592906; bufend =3D 28311480; bufvoid =3D 99614720 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 327677; kvend =3D 262140; length =3D 327680 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 5 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 28311480; bufend =3D 33030054; bufvoid =3D 99614720 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 262140; kvend =3D 196603; length =3D 327680 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 6 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 33030054; bufend =3D 37748628; bufvoid =3D 99614720 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 196603; kvend =3D 131066; length =3D 327680 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 7 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling = map output: record full =3D true 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = =3D 37748628; bufend =3D 42467202; bufvoid =3D 99614720 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart =3D= 131066; kvend =3D 65529; length =3D 327680 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 8 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting = flush of map output 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished = spill 9 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 = sorted segments 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to = the last merge-pass, with 10 segments left of total size: 50698660 bytes 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: = Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process = of commiting 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task = 'attempt_201108051329_0060_m_000000_0' done.

2011-08-10 = 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing = JVM Metrics with processName=3DSHUFFLE, sessionId=3D 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: = ShuffleRamManager: MemoryLimit=3D140699232, = MaxSingleShuffleLimit=3D35174808 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging = on-disk files 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging = in memory files 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging = on-disk files 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Need another 1 map output(s) where = 0 is already in progress 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling = Map Completion Events 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Scheduled 0 outputs (0 slow hosts = and0 dup hosts) 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts = and0 dup hosts) 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: = header: attempt_201108051329_0060_m_000000_0, compressed len: 50698646, = decompressed len: 50698642 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: = Shuffling 50698642 bytes (50698646 raw bytes) into Local-FS from = attempt_201108051329_0060_m_000000_0 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read = 50698646 bytes from map-output for attempt_201108051329_0060_m_000000_0 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: = attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging = on-disk files 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: = GetMapEventsThread exiting 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: = getMapsEventsThread joined. 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed = ram manager 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: = Interleaved on-disk merge complete: 1 files left. 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: = In-memory merge complete: 0 files left. 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: = Merging 1 files, 50698646 bytes from disk 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: = Merging 0 segments, 0 bytes from memory into reduce 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 = sorted segments 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to = the last merge-pass, with 1 segments left of total size: 50698642 bytes 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: = Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process = of commiting 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task = 'attempt_201108051329_0060_r_000000_0' done.
My code is similar to the word count = example:

    public int run(String[] args) = throws Exception {
= ...
        getConf().set(CONF_COLUMN_NAME, = columnName);
        Job job4 =3D = new Job(getConf(), "SalesRankWriter");
  =       = job4.setJarByClass(SalesRankLoader.class);
    =     job4.setMapperClass(RankUpdateMapper.class);
  =       = job4.setReducerClass(RankUpdateReducer.class);
    =     job4.setMapOutputKeyClass(Text.class);
  =       = job4.setMapOutputValueClass(IntWritable.class);
    =     job4.setOutputKeyClass(ByteBuffer.class);
  =       = job4.setOutputValueClass(List.class);
    =     = job4.setOutputFormatClass(ColumnFamilyOutputFormat.class);
  =       = job4.setInputFormatClass(TextInputFormat.class);
    =     FileInputFormat.addInputPath(job4, new = Path(prePath));

        = ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), KEYSPACE, = columnFamily);
  =       ConfigHelper.setRpcPort(job4.getConfiguration(), = "9260");
        = ConfigHelper.setInitialAddress(job4.getConfiguration(), = "dnjsrcha01");
  =       = ConfigHelper.setPartitioner(job4.getConfiguration(), = "org.apache.cassandra.dht.RandomPartitioner");

        = job4.waitForCompletion(true);
...
= }

where the mapper and reducer = are defined as:
public static class RankUpdateMapper extends = Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) = throws IOException, InterruptedException { String line =3D value.toString(); StringTokenizer tokenizer =3D new StringTokenizer(line); String ean =3D tokenizer.nextToken(); int rank =3D Integer.parseInt(tokenizer.nextToken()); context.write(new Text(ean), new IntWritable(rank)); } } public static class RankUpdateReducer extends Reducer<Text, = IntWritable, ByteBuffer, List<Mutation>> { private ByteBuffer outputKey; protected void setup(org.apache.hadoop.mapreduce.Reducer.Context = context) throws IOException, InterruptedException { outputKey =3D = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); } public void reduce(Text key, Iterable<IntWritable> values, = Context context) throws IOException, InterruptedException { context.write(outputKey, = Collections.singletonList(getMutation(key, = values.iterator().next().get()))); } private static Mutation getMutation(Text key, int value) { Column c =3D new Column(); c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), = key.getLength()))); c.setValue(ByteBufferUtil.bytes(String.valueOf(value))); c.setTimestamp(System.currentTimeMillis() * 1000); Mutation m =3D new Mutation(); m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); m.column_or_supercolumn.setColumn(c); return m; } }