incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shahab Yunus <shahab.yu...@gmail.com>
Subject Re: get float column in cassandra mapreduce
Date Sat, 05 Oct 2013 15:43:18 GMT
Couple of things which I could I think of. Other might have better ideas.

1- The exception is about encoding mismatch. Do you know what is your
source files's encoding and what is your system's default? E.g. it can be
ISO8859-1 in Windows, UTF-8 in Linux etc.and your file has something else.
You can explicitly use UTF-8 everywhere if you want. There is wealth of
information available on the net if you google it.

2- This is more of an aside, you are parsing your data to float and String
without checking what column it is.Baically you do the following two
conversion in all cases, no matter what the column, so what will happen if
the column is data and toDouble statement is called?:
 String value1 = ByteBufferUtil.string(column.getValue());
double value2 = ByteBufferUtil.toDouble(column.getValue());

Did it ever work?


3- Is the column name in your source data files 'temperature' or
'temprature'? You are using the latter in your code and if it is not what
is in the data then you might be trying to parse empty or malformed string.

Regards,
Shahab


On Sat, Oct 5, 2013 at 5:16 AM, Anseh Danesh <anseh.danesh@gmail.com> wrote:

> Hi all... I have a question. in the cassandra wordcount mapreduce with
> cql3, I want to get a string column and a float (or double) column as map
> input key and value. I mean I want to get date column of type string as key
> and temprature column of type float as value. but when I println value of
> temprature it shows me som of them and then error....
>
>
> here is the code:
> package org.apache.cassandra.com;
>
> import java.io.IOException;
> import java.nio.ByteBuffer;
> import java.util.*;
> import java.util.Map.Entry;
>
> import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
> import org.apache.cassandra.hadoop.cql3.CqlOutputFormat;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
> import org.apache.cassandra.hadoop.ConfigHelper;
> import org.apache.cassandra.utils.ByteBufferUtil;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.IntWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.hadoop.mapreduce.Reducer;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> import java.nio.charset.CharacterCodingException;
>
>
> public class dewpoint extends Configured implements Tool
> {
>     private static final Logger logger =
> LoggerFactory.getLogger(dewpoint.class);
>
>     static final String KEYSPACE = "weather";
>     static final String COLUMN_FAMILY = "momentinfo";
>
>     static final String OUTPUT_REDUCER_VAR = "output_reducer";
>     static final String OUTPUT_COLUMN_FAMILY = "output_words";
>
>     private static final String OUTPUT_PATH_PREFIX = "/tmp/dewpointt";
>
>     private static final String PRIMARY_KEY = "row_key";
>
>     public static void main(String[] args) throws Exception
>     {
>         // Let ToolRunner handle generic command-line options
>         ToolRunner.run(new Configuration(), new dewpoint(), args);
>         System.exit(0);
>     }
>
>     public static class TokenizerMapper extends Mapper<Map<String,
> ByteBuffer>, Map<String, ByteBuffer>, Text, IntWritable>
>     {
>         private final static IntWritable one = new IntWritable(1);
>         private Text date = new Text();
>
>
>         public void map(Map<String, ByteBuffer> keys, Map<String,
> ByteBuffer> columns, Context context) throws IOException,
> InterruptedException
>         {
>             for (Entry<String, ByteBuffer> column : columns.entrySet())
>             {
>                 if (!"date".equalsIgnoreCase(column.getKey()) &&
> !"temprature".equalsIgnoreCase(column.getKey()))
>                     continue;
>
>                 String value1 = ByteBufferUtil.string(column.getValue());
>                 double value2 = ByteBufferUtil.toDouble(column.getValue());
>                 System.out.println(value2);
> .....
>
>
> and here is the error:
>
> 13/10/05 12:36:22 INFO com.dewpoint: output reducer type: filesystem
> 13/10/05 12:36:24 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> 13/10/05 12:36:24 WARN mapred.JobClient: No job jar file set.  User
> classes may not be found. See JobConf(Class) or JobConf#setJar(String).
> 13/10/05 12:36:26 INFO mapred.JobClient: Running job:
> job_local1875596001_0001
> 13/10/05 12:36:27 INFO mapred.LocalJobRunner: Waiting for map tasks
> 13/10/05 12:36:27 INFO mapred.LocalJobRunner: Starting task:
> attempt_local1875596001_0001_m_000000_0
> 13/10/05 12:36:27 INFO util.ProcessTree: setsid exited with exit code 0
> 13/10/05 12:36:27 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1e2670b
> 13/10/05 12:36:27 INFO mapred.MapTask: Processing split:
> ColumnFamilySplit((5366152502320075885, '9070993788622720120] @[localhost])
> 13/10/05 12:36:27 INFO mapred.MapTask: io.sort.mb = 100
> 13/10/05 12:36:27 INFO mapred.JobClient:  map 0% reduce 0%
> 13/10/05 12:36:28 INFO mapred.MapTask: data buffer = 79691776/99614720
> 13/10/05 12:36:28 INFO mapred.MapTask: record buffer = 262144/327680
> 6.00457842484433E-67
> 13/10/05 12:36:30 INFO mapred.MapTask: Starting flush of map output
> 13/10/05 12:36:30 INFO mapred.MapTask: Finished spill 0
> 13/10/05 12:36:30 INFO mapred.LocalJobRunner: Starting task:
> attempt_local1875596001_0001_m_000001_0
> 13/10/05 12:36:30 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1579a30
> 13/10/05 12:36:30 INFO mapred.MapTask: Processing split:
> ColumnFamilySplit((-5699318449577318512, '-2034684803435882987]
> @[localhost])
> 13/10/05 12:36:30 INFO mapred.MapTask: io.sort.mb = 100
> 13/10/05 12:36:32 INFO mapred.MapTask: data buffer = 79691776/99614720
> 13/10/05 12:36:32 INFO mapred.MapTask: record buffer = 262144/327680
> 6.004578424845004E-67
> 13/10/05 12:36:32 INFO mapred.MapTask: Starting flush of map output
> 13/10/05 12:36:32 INFO mapred.MapTask: Finished spill 0
> 13/10/05 12:36:32 INFO mapred.LocalJobRunner: Starting task:
> attempt_local1875596001_0001_m_000002_0
> 13/10/05 12:36:32 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@112da40
> 13/10/05 12:36:32 INFO mapred.MapTask: Processing split:
> ColumnFamilySplit((1684704676388456087, '5366152502320075885] @[localhost])
> 13/10/05 12:36:32 INFO mapred.MapTask: io.sort.mb = 100
> 13/10/05 12:36:32 INFO mapred.MapTask: data buffer = 79691776/99614720
> 13/10/05 12:36:32 INFO mapred.MapTask: record buffer = 262144/327680
> 1.4273722733722645E-71
> 13/10/05 12:36:32 INFO mapred.MapTask: Starting flush of map output
> 13/10/05 12:36:32 INFO mapred.MapTask: Finished spill 0
> 13/10/05 12:36:32 INFO mapred.LocalJobRunner: Starting task:
> attempt_local1875596001_0001_m_000003_0
> 13/10/05 12:36:32 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@126a29c
> 13/10/05 12:36:32 INFO mapred.MapTask: Processing split:
> ColumnFamilySplit((-9223372036854775808, '-5699318449577318512]
> @[localhost])
> 13/10/05 12:36:32 INFO mapred.MapTask: io.sort.mb = 100
> 13/10/05 12:36:33 INFO mapred.MapTask: data buffer = 79691776/99614720
> 13/10/05 12:36:33 INFO mapred.LocalJobRunner:
> 13/10/05 12:36:33 INFO mapred.MapTask: record buffer = 262144/327680
> 6.00457842484433E-67
> 13/10/05 12:36:33 INFO mapred.MapTask: Starting flush of map output
> 13/10/05 12:36:33 INFO mapred.MapTask: Finished spill 0
> 13/10/05 12:36:33 INFO mapred.LocalJobRunner: Starting task:
> attempt_local1875596001_0001_m_000004_0
> 13/10/05 12:36:33 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2f2295
> 13/10/05 12:36:33 INFO mapred.MapTask: Processing split:
> ColumnFamilySplit((-2034684803435882987, '1684704676388456087] @[localhost])
> 13/10/05 12:36:33 INFO mapred.MapTask: io.sort.mb = 100
> 13/10/05 12:36:34 INFO mapred.MapTask: data buffer = 79691776/99614720
> 13/10/05 12:36:34 INFO mapred.MapTask: record buffer = 262144/327680
> 13/10/05 12:36:34 INFO mapred.JobClient:  map 16% reduce 0%
> 6.004595404242602E-67
> 13/10/05 12:36:34 INFO mapred.MapTask: Starting flush of map output
> 13/10/05 12:36:34 INFO mapred.MapTask: Finished spill 0
> 13/10/05 12:36:34 INFO mapred.LocalJobRunner: Starting task:
> attempt_local1875596001_0001_m_000005_0
> 13/10/05 12:36:34 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1706da8
> 13/10/05 12:36:34 INFO mapred.MapTask: Processing split:
> ColumnFamilySplit((9070993788622720120, '-9223372036854775808] @[localhost])
> 13/10/05 12:36:34 INFO mapred.MapTask: io.sort.mb = 100
> 13/10/05 12:36:34 INFO mapred.MapTask: data buffer = 79691776/99614720
> 13/10/05 12:36:34 INFO mapred.MapTask: record buffer = 262144/327680
> 6.004601064041352E-67
> 13/10/05 12:36:34 INFO mapred.MapTask: Starting flush of map output
> 13/10/05 12:36:34 INFO mapred.MapTask: Finished spill 0
> 13/10/05 12:36:34 INFO mapred.LocalJobRunner: Map task executor complete.
> 13/10/05 12:36:34 WARN mapred.LocalJobRunner: job_local1875596001_0001
> java.lang.Exception: java.nio.charset.MalformedInputException: Input
> length = 1
>     at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
> Caused by: java.nio.charset.MalformedInputException: Input length = 1
>     at java.nio.charset.CoderResult.throwException(CoderResult.java:260)
>     at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:781)
>     at
> org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167)
>     at
> org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124)
>     at
> org.apache.cassandra.com.dewpoint$TokenizerMapper.map(dewpoint.java:65)
>     at
> org.apache.cassandra.com.dewpoint$TokenizerMapper.map(dewpoint.java:1)
>     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
>     at
> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>     at java.lang.Thread.run(Thread.java:662)
> 13/10/05 12:36:35 INFO mapred.JobClient: Job complete:
> job_local1875596001_0001
> 13/10/05 12:36:35 INFO mapred.JobClient: Counters: 15
> 13/10/05 12:36:35 INFO mapred.JobClient:   FileSystemCounters
> 13/10/05 12:36:35 INFO mapred.JobClient:     FILE_BYTES_READ=2713
> 13/10/05 12:36:35 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=53478
> 13/10/05 12:36:35 INFO mapred.JobClient:   File Input Format Counters
> 13/10/05 12:36:35 INFO mapred.JobClient:     Bytes Read=0
> 13/10/05 12:36:35 INFO mapred.JobClient:   Map-Reduce Framework
> 13/10/05 12:36:35 INFO mapred.JobClient:     Map output materialized
> bytes=23
> 13/10/05 12:36:35 INFO mapred.JobClient:     Combine output records=1
> 13/10/05 12:36:35 INFO mapred.JobClient:     Map input records=1
> 13/10/05 12:36:35 INFO mapred.JobClient:     Physical memory (bytes)
> snapshot=0
> 13/10/05 12:36:35 INFO mapred.JobClient:     Spilled Records=1
> 13/10/05 12:36:35 INFO mapred.JobClient:     Map output bytes=15
> 13/10/05 12:36:35 INFO mapred.JobClient:     CPU time spent (ms)=0
> 13/10/05 12:36:35 INFO mapred.JobClient:     Total committed heap usage
> (bytes)=363921408
> 13/10/05 12:36:35 INFO mapred.JobClient:     Virtual memory (bytes)
> snapshot=0
> 13/10/05 12:36:35 INFO mapred.JobClient:     Combine input records=1
> 13/10/05 12:36:35 INFO mapred.JobClient:     Map output records=1
> 13/10/05 12:36:35 INFO mapred.JobClient:     SPLIT_RAW_BYTES=103
>
> what does it mean?
>

Mime
View raw message