cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Kjellman (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (CASSANDRA-4912) BulkOutputFormat should support Hadoop MultipleOutput
Date Thu, 08 Nov 2012 22:54:12 GMT

    [ https://issues.apache.org/jira/browse/CASSANDRA-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13493577#comment-13493577
] 

Michael Kjellman edited comment on CASSANDRA-4912 at 11/8/12 10:54 PM:
-----------------------------------------------------------------------

So when ConfigHelper calls checkOutputSpecs() in local mode when the job is setup we don't
throw any exceptions. When a reducer is created however org.apache.cassandra.hadoop.ConfigHelper.getOutputColumnFamily
throws a UnsupportedOperationException that the output column family isn't setup. It looks
like mapreduce.output.basename is null.

See Example.java attached as a stripped down example MR job.
                
      was (Author: mkjellman):
    So when ConfigHelper calls checkOutputSpecs() in local mode when the job is setup we don't
throw any exceptions. When a reducer is created however org.apache.cassandra.hadoop.ConfigHelper.getOutputColumnFamily
throws a UnsupportedOperationException that the output column family isn't setup. It looks
like mapreduce.output.basename is null.

Job Config is something along the lines of

public int run(String[] args) throws Exception
	{	
		Job job = new Job(getConf(), "MRJobName");
	
		job.setJarByClass(Nashoba.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setReducerClass(ReducerToCassandra.class);
		job.setInputFormatClass(ColumnFamilyInputFormat.class);
		
		// setup 3 reducers
		job.setNumReduceTasks(3);

		// thrift input job settings
		ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
		ConfigHelper.setInputInitialAddress(job.getConfiguration(), "127.0.0.1");
		ConfigHelper.setInputPartitioner(job.getConfiguration(), "RandomPartitioner");

		// thrift output job settings
		ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
		ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "127.0.0.1");
		ConfigHelper.setOutputPartitioner(job.getConfiguration(), "RandomPartitioner");
		
		//set timeout to 1 hour for testing
		job.getConfiguration().set("mapreduce.task.timeout", "3600000");
		job.getConfiguration().set("mapred.task.timeout", "3600000");
		
		job.getConfiguration().set("mapreduce.output.bulkoutputformat.buffersize", "64");
job.setOutputFormatClass(BulkOutputFormat.class);
		ConfigHelper.setRangeBatchSize(getConf(), 99);
		
		// let ConfigHelper know what Column Family to get data from and where to output it
		ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, INPUT_COLUMN_FAMILY);
		
		ConfigHelper.setOutputKeyspace(job.getConfiguration(), KEYSPACE);
		MultipleOutputs.addNamedOutput(job, OUTPUT_COLUMN_FAMILY1, BulkOutputFormat.class, ByteBuffer.class,
List.class);
		MultipleOutputs.addNamedOutput(job, OUTPUT_COLUMN_FAMILY2, BulkOutputFormat.class, ByteBuffer.class,
List.class);
		
		//what classes the mapper will write and what the consumer should expect to recieve
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(MapWritable.class);
		job.setOutputKeyClass(ByteBuffer.class);
		job.setOutputValueClass(List.class);
		
		SliceRange sliceRange = new SliceRange();
		sliceRange.setStart(new bytes[0]);
		sliceRange.setFinish(new bytes[0]);
		SlicePredicate predicate = new SlicePredicate();
		predicate.setSlice_range(sliceRange);
		ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
		
		job.waitForCompletion(true);
		return 0;
}

public static class ReducerToCassandra extends Reducer<Text, MapWritable, ByteBuffer, List<Mutation>>
	{
		private MultipleOutputs<ByteBuffer, List<Mutation>> output;
		
		@Override
		public void setup(Context context) {
			output = new MultipleOutputs<ByteBuffer, List<Mutation>>(context);
		}
		
		public void reduce(Text word, Iterable<MapWritable> values, Context context) throws
IOException, InterruptedException
    	{
			do stuff in reducer...

			//write out our result to Hadoop
			context.progress();
			//for writing to 2 column families
			output.write(OUTPUT_COLUMN_FAMILY1, key, Collections.singletonList(getMutation1(word, val)));
			output.write(OUTPUT_COLUMN_FAMILY2, key, Collections.singletonList(getMutation2(word, val)));
		}

		
		public void cleanup(Context context) throws IOException, InterruptedException {
			output.close(); //closes all of the opened outputs
		}

	}
                  
> BulkOutputFormat should support Hadoop MultipleOutput
> -----------------------------------------------------
>
>                 Key: CASSANDRA-4912
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-4912
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Hadoop
>    Affects Versions: 1.2.0 beta 1
>            Reporter: Michael Kjellman
>
> Much like CASSANDRA-4208 BOF should support outputting to Multiple Column Families. The
current approach takken in the patch for COF results in only one stream being sent.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message