hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Marc Spaggiari <jean-m...@spaggiari.org>
Subject Re: [Error]Finding average using hbase hadoop
Date Mon, 19 Aug 2013 13:46:58 GMT
Hi Manish,

I did not even looked at it ;)

Wha investigation have you done on your side to try to figure what the
issue is?

Have you add some logging to see the values before they are used to figure
what they exactly are?
Have you looked at some code to see what it's doing?

What is the line 59 on your code?

Also, as asked previously,  " can you please paste you code on pastbin?
Same for the exception."? It's easier to look at it.

JM


2013/8/19 manish dunani <manishd207@gmail.com>

> hello,
> jean
>
> Did u find it??
>
>
> On Sun, Aug 18, 2013 at 8:28 AM, manish dunani <manishd207@gmail.com>
> wrote:
>
> > But i want my output likewise::
> >
> >
> > ROW                    CELL+COLUMN
> >
> > QXM                      column=stocks_output:average,
> > timestamp=XXXXXXXXXX, val
> >                       ue=XXXX
> > QTM                      column=stocks_output:average,
> > timestamp=XXXXXXXXXX, val
> >                       ue=XXXX
> >
> >
> > *sample dataset in hbase::**(table name:nyse4)*
> >
> >
> >  2010-02-04           column=stocks:open, timestamp=1376567559424,
> > value=2.5
> >  2010-02-04           column=stocks:symbol, timestamp=1376567559424,
> > value=QXM
> >  2010-02-05           column=stocks:open, timestamp=1376567559429,
> > value=2.42
> >  2010-02-05           column=stocks:symbol, timestamp=1376567559429,
> > value=QXM
> >
> >
> > ===>>In my previous output i didn't get any symbol(qulifier)'s values in
> > my table as Row key..
> >
> >
> >
> > hbase(main):004:0> scan 'nyse5'
> > ROW                   COLUMN+CELL
> > symbol               column=stocks_output:average,
> > timestamp=1376749641978, val
> >                       ue=@\xC6o\x11
> >
> > *So,that i changed my programme like wise::*
> >
> >
> > package com.maddy;
> >
> > import java.io.IOException;
> >
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.fs.Path;
> > import org.apache.hadoop.hbase.HBaseConfiguration;
> > import org.apache.hadoop.hbase.client.Result;
> > import org.apache.hadoop.hbase.client.Scan;
> > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> > import org.apache.hadoop.hbase.mapreduce.TableMapper;
> > import org.apache.hadoop.hbase.util.Bytes;
> > //import org.apache.hadoop.io.DoubleWritable;
> > import org.apache.hadoop.io.FloatWritable;
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.Job;
> > import org.apache.hadoop.mapreduce.Reducer;
> >
> > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> >
> >
> > public class openaveragestock
> > {
> >     public static class map extends TableMapper<Text,FloatWritable>
> >     {
> >         private static String col_family="stocks";
> >         private static String qul="open";
> >
> >         private static String col_family1="stocks";
> >         private static String qul1="symbol";
> >
> >         private static byte[] colfamily2=Bytes.toBytes(col_family);
> >         private static byte[] qul2=Bytes.toBytes(qul);
> >
> >         private static byte[] colfamily3=Bytes.toBytes(col_family1);
> >         private static byte[] qul3=Bytes.toBytes(qul1);
> >
> > //        public static float toFloat(int qul2)
> > //        {
> > //            return Float.intBitsToFloat((qul2));
> > //
> > //        }
> > //
> >         private static Text k1=new Text();
> >
> >
> >
> >         public void map(ImmutableBytesWritable row,Result value,Context
> > context) throws IOException
> >         {
> >
> >
> >             //byte[]
> > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
> >             //String
> >
> k=Bytes.toString(value.getValue(Bytes.toBytes("stocks"),Bytes.toBytes("symbol")));
> >            byte[] val=value.getValue(colfamily2,qul2);
> >           String k=Bytes.toString(value.getValue(colfamily3,qul3));
> >
> >             //ImmutableBytesWritable stock_symbol=new
> > ImmutableBytesWritable(qul3);
> >
> >         k1.set(k);
> >
> >             try
> >             {
> >
> >                 context.write(k1,new
> > FloatWritable(Float.parseFloat(Bytes.toString(val))));
> >
> >             }
> >
> >             catch(InterruptedException e)
> >
> >             {
> >                  throw new IOException(e);
> >             }
> >
> >
> >         }
> >
> >
> >     }
> >
> >
> >     public static class Reduce extends Reducer<Text,FloatWritable,
> > Text,FloatWritable>
> >     {
> >
> >         public void reduce(Text key,Iterable<FloatWritable>values,Context
> > context) throws IOException, InterruptedException
> >
> >         {
> >             float sum=0;
> >             int count=0;
> >             float average=0;
> >             for(FloatWritable val:values)
> >             {
> >                 sum+=val.get();
> >                 count++;
> >             }
> >             average=(sum/count);
> > //            Put put=new Put(key.getBytes());
> > //
> >
> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
> >
> >             System.out.println("For\t"+count+"\t average is:"+average);
> >             context.write(key,new FloatWritable(average));
> >
> >
> >         }
> >
> >     }
> >
> >     public static void main(String args[]) throws IOException,
> > ClassNotFoundException, InterruptedException
> >     {
> >         Configuration config=HBaseConfiguration.create();
> >         config.addResource("/home/manish/workspace/hbase
> > project/bin/hbase-site.xml");
> >         Job job=new Job(config,"openstockaverage1");
> >
> >
> >         Scan scan=new Scan();
> >         scan.addFamily("stocks".getBytes());
> >         scan.setFilter(new FirstKeyOnlyFilter());
> >
> >         TableMapReduceUtil.initTableMapperJob("nyse4",
> >                 scan,
> >                 map.class,
> >                 ImmutableBytesWritable.class,
> >                 FloatWritable.class,
> >                 job);
> >
> > //        TableMapReduceUtil.initTableReducerJob("nyse5",
> > //                Reduce.class,
> > //                job);
> >     job.setReducerClass(Reduce.class);
> >
> >         FileOutputFormat.setOutputPath(job, new Path(
> >
> "hdfs://localhost:54310/user/manish/Full_final_output_5"));
> >         job.waitForCompletion(true);
> >     }
> >
> >
> >
> > }
> >
> >
> > *It throws the error::*
> >
> > 13/08/17 19:37:59 INFO mapred.JobClient: Running job: job_local_0001
> > 13/08/17 19:37:59 INFO util.ProcessTree: setsid exited with exit code 0
> > 13/08/17 19:37:59 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> > org.apache.hadoop.util.LinuxResourceCalculatorPlugin@bd96dd
> > 13/08/17 19:37:59 INFO mapred.MapTask: io.sort.mb = 100
> > 13/08/17 19:38:00 INFO mapred.JobClient:  map 0% reduce 0%
> > 13/08/17 19:38:00 INFO mapred.MapTask: data buffer = 79691776/99614720
> > 13/08/17 19:38:00 INFO mapred.MapTask: record buffer = 262144/327680
> > 13/08/17 19:38:00 WARN mapred.LocalJobRunner: job_local_0001
> > java.lang.NullPointerException
> >     at org.apache.hadoop.io.Text.encode(Text.java:388)
> >     at org.apache.hadoop.io.Text.set(Text.java:178)
> >     at com.maddy.openaveragestock$map.map(openaveragestock.java:59)
> >
> >     at com.maddy.openaveragestock$map.map(openaveragestock.java:1)
> >     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> >     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
> >     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
> >     at
> > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
> > 13/08/17 19:38:01 INFO mapred.JobClient: Job complete: job_local_0001
> > 13/08/17 19:38:01 INFO mapred.JobClient: Counters: 0
> >
> >
> >
> >
> >
> > On Sun, Aug 18, 2013 at 7:55 AM, manish dunani <manishd207@gmail.com
> >wrote:
> >
> >> *Here is output table===>>>
> >>
> >> *
> >>>
> >>> hbase(main):004:0> scan 'nyse5'
> >>> ROW
> >>> COLUMN+CELL
> >>>  symbol               column=stocks_output:average,
> >>> timestamp=1376749641978, val
> >>>                       ue=@\xC6o\x11
> >>>
> >>
> >>
> >>
> >>
> >> *Sample output  at my eclipse:::*
> >>
> >> 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
> >> 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass, with
> 1
> >> segments left of total size: 42242 bytes
> >> 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
> >> 13/08/17 07:27:21 INFO mapred.JobClient:  map 100% reduce 0%
> >> *
> >> For    2640    average is:6.201058*
> >>
> >>
> >> On Sat, Aug 17, 2013 at 11:59 PM, Jean-Marc Spaggiari <
> >> jean-marc@spaggiari.org> wrote:
> >>
> >>> Are you outputting to a table? From your code, I don't see any output
> >>> configured.
> >>>
> >>> 2013/8/17 manish dunani <manishd207@gmail.com>
> >>>
> >>> > Thanx a lot!!
> >>> > Jean.
> >>> >
> >>> > I am very thankful to you..And off course Ted also doing very good
> job.
> >>> > *
> >>> >
> >>> > Revised Code ::*
> >>> >
> >>> > Package com.maddy;
> >>> > >
> >>> > > import java.io.IOException;
> >>> > >
> >>> > > import org.apache.hadoop.conf.Configuration;
> >>> > > import org.apache.hadoop.fs.Path;
> >>> > > import org.apache.hadoop.hbase.HBaseConfiguration;
> >>> > > import org.apache.hadoop.hbase.client.Put;
> >>> > > import org.apache.hadoop.hbase.client.Result;
> >>> > > import org.apache.hadoop.hbase.client.Scan;
> >>> > > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
> >>> > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> >>> > > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> >>> > > import org.apache.hadoop.hbase.mapreduce.TableMapper;
> >>> > > import org.apache.hadoop.hbase.mapreduce.TableReducer;
> >>> > > import org.apache.hadoop.hbase.util.Bytes;
> >>> > > //import org.apache.hadoop.io.DoubleWritable;
> >>> > > import org.apache.hadoop.io.FloatWritable;
> >>> > > import org.apache.hadoop.mapreduce.Job;
> >>> > > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> >>> > >
> >>> > >
> >>> > > public class openaveragestock
> >>> > > {
> >>> > >     public static class map extends
> >>> > > TableMapper<ImmutableBytesWritable,FloatWritable>
> >>> > >     {
> >>> > >         private static String col_family="stocks";
> >>> > >         private static String qul="open";
> >>> > >
> >>> > >         private static String col_family1="stocks";
> >>> > >         private static String qul1="symbol";
> >>> > >
> >>> > >         private static byte[] colfamily2=Bytes.toBytes(col_family);
> >>> > >         private static byte[] qul2=Bytes.toBytes(qul);
> >>> > >
> >>> > >         private static byte[]
> colfamily3=Bytes.toBytes(col_family1);
> >>> > >         private static byte[] qul3=Bytes.toBytes(qul1);
> >>> > >
> >>> > > //        public static float toFloat(int qul2)
> >>> > > //        {
> >>> > > //            return Float.intBitsToFloat((qul2));
> >>> > > //
> >>> > > //        }
> >>> > > //
> >>> > >
> >>> > >
> >>> > >
> >>> > >         public void map(ImmutableBytesWritable row,Result
> >>> value,Context
> >>> > > context) throws IOException
> >>> > >         {
> >>> > >
> >>> > >
> >>> > >             //byte[]
> >>> > > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
> >>> > >            byte[] val=value.getValue(colfamily2,qul2);
> >>> > >
> >>> > >
> >>> > >             ImmutableBytesWritable stock_symbol=new
> >>> > > ImmutableBytesWritable(qul3);
> >>> > >
> >>> > >
> >>> > >
> >>> > >             try
> >>> > >             {
> >>> > >
> >>> > >                 context.write(stock_symbol,new
> >>> > > FloatWritable(Float.parseFloat(Bytes.toString(val))));
> >>> > >             }
> >>> > >
> >>> > >             catch(InterruptedException e)
> >>> > >
> >>> > >             {
> >>> > >                  throw new IOException(e);
> >>> > >             }
> >>> > >
> >>> > >
> >>> > >         }
> >>> > >
> >>> > >
> >>> > >     }
> >>> > >
> >>> > >
> >>> > >     public static class reduce extends
> >>> > >
> >>>
> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
> >>> > >     {
> >>> > >
> >>> > >         @Override
> >>> > >         public void reduce(ImmutableBytesWritable
> >>> > > key,Iterable<FloatWritable>values,Context context) throws
> >>> IOException,
> >>> > > InterruptedException
> >>> > >         {
> >>> > >             float sum=0;
> >>> > >             int count=0;
> >>> > >             float average=0;
> >>> > >             for(FloatWritable val:values)
> >>> > >             {
> >>> > >                 sum+=val.get();
> >>> > >                 count++;
> >>> > >             }
> >>> > >             average=(sum/count);
> >>> > >             Put put=new Put(key.get());
> >>> > >
> >>> > >
> >>> >
> >>>
> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
> >>> > >             System.out.println("For\t"+count+"\t average
> >>> is:"+average);
> >>> > >             context.write(key,put);
> >>> > >
> >>> > >         }
> >>> > >
> >>> > >     }
> >>> > >
> >>> > >     public static void main(String args[]) throws IOException,
> >>> > > ClassNotFoundException, InterruptedException
> >>> > >     {
> >>> > >         Configuration config=HBaseConfiguration.create();
> >>> > >         config.addResource("/home/manish/workspace/hbase
> >>> > > project/bin/hbase-site.xml");
> >>> > >         Job job=new Job(config,"openstockaverage1");
> >>> > >
> >>> > >
> >>> > >         Scan scan=new Scan();
> >>> > >         scan.addFamily("stocks".getBytes());
> >>> > >         scan.setFilter(new FirstKeyOnlyFilter());
> >>> > >
> >>> > >         TableMapReduceUtil.initTableMapperJob("nyse4",
> >>> > >                 scan,
> >>> > >                 map.class,
> >>> > >                 ImmutableBytesWritable.class,
> >>> > >                 FloatWritable.class,
> >>> > >                 job);
> >>> > >
> >>> > >         TableMapReduceUtil.initTableReducerJob("nyse5",
> >>> > >                 reduce.class,
> >>> > >                 job);
> >>> > >         //job.setReducerClass(reduce.class);
> >>> > >
> >>> > >         //FileOutputFormat.setOutputPath(job, new Path(
> >>> > >             //
> >>> > > "hdfs://localhost:54310/user/manish/final_hbase_hadoop"));
> >>> > >         job.waitForCompletion(true);
> >>> > >     }
> >>> > >
> >>> > >
> >>> > >
> >>> > > }
> >>> > >
> >>> > >
> >>> > *Sample output  at my eclipse:::*
> >>> >
> >>> > 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
> >>> > > 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass,
> >>> with 1
> >>> > > segments left of total size: 42242 bytes
> >>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
> >>> > > 13/08/17 07:27:21 INFO mapred.JobClient:  map 100% reduce 0%
> >>> > > *For    2640    average is:6.201058*
> >>> > > 13/08/17 07:27:21 INFO mapred.Task:
> >>> Task:attempt_local_0001_r_000000_0 is
> >>> > > done. And is in the process of commiting
> >>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner: reduce > reduce
> >>> > > 13/08/17 07:27:21 INFO mapred.Task: Task
> >>> 'attempt_local_0001_r_000000_0'
> >>> > > done.
> >>> > > 13/08/17 07:27:22 WARN mapred.FileOutputCommitter: Output path
is
> >>> null in
> >>> > > cleanup
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:  map 100% reduce 100%
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Job complete:
> job_local_0001
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Counters: 30
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   HBase Counters
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REMOTE_RPC_CALLS=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     RPC_CALLS=2643
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     RPC_RETRIES=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
> >>> > NOT_SERVING_REGION_EXCEPTION=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     NUM_SCANNER_RESTARTS=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
> >>> MILLIS_BETWEEN_NEXTS=5849
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
> BYTES_IN_RESULTS=126719
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
> >>> BYTES_IN_REMOTE_RESULTS=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REGIONS_SCANNED=1
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REMOTE_RPC_RETRIES=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   File Output Format
> >>> Counters
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Bytes Written=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   FileSystemCounters
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
> FILE_BYTES_READ=24176810
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
> >>> FILE_BYTES_WRITTEN=24552208
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   File Input Format
> Counters
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Bytes Read=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   Map-Reduce Framework
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce input groups=1
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output
> materialized
> >>> > > bytes=42246
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Combine output
> records=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map input records=2640
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce shuffle bytes=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Physical memory
> (bytes)
> >>> > > snapshot=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce output
> records=1
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Spilled Records=5280
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output bytes=36960
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     CPU time spent (ms)=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Total committed heap
> >>> usage
> >>> > > (bytes)=321527808
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Virtual memory (bytes)
> >>> > > snapshot=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Combine input
> records=0
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output
> records=2640
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     SPLIT_RAW_BYTES=65
> >>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce input
> >>> records=2640
> >>> > >
> >>> >
> >>> > *
> >>> > *
> >>> > *Question:::(please don't laugh at me if i will ask you a silly
> >>> question)
> >>> >
> >>> > *
> >>> > Here in code i set output directory.But,when i seen in hdfs directory
> >>> it is
> >>> > not contain any part-0000 file.It contains only SUCCESS file.*
> >>> > *
> >>> > Can i ask  why it is happen.??*
> >>> >
> >>> >
> >>> >
> >>> > *
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > On Sat, Aug 17, 2013 at 5:24 PM, Jean-Marc Spaggiari <
> >>> > jean-marc@spaggiari.org> wrote:
> >>> >
> >>> > > Hi Manish.
> >>> > >
> >>> > > Looking a bit more at this, I think the issue is because you
> >>> "floats" are
> >>> > > written in you table as strings and not as floats....
> >>> > >
> >>> > > Can you try something link:
> >>> > > context.write(stock_symbol,new FloatWritable(
> >>> > > Float.parseFloat(Bytes.toString(val)))));
> >>> > >
> >>> > > Also, as asked previously,  " can you please paste you code on
> >>> pastbin?
> >>> > > Same for the exception."?
> >>> > >
> >>> > > Thanks,
> >>> > >
> >>> > > JM
> >>> > >
> >>> > > 2013/8/17 manish dunani <manishd207@gmail.com>
> >>> > >
> >>> > > > Hey jean,
> >>> > > > I did it according to you.
> >>> > > > I convert as u told..But still face the same error.
> >>> > > >
> >>> > > > And ted I am new to this don't get an idea how can i use
this
> >>> > method..Can
> >>> > > > You please show me..?
> >>> > > >
> >>> > > > Your Help will be appreciated..
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > On Fri, Aug 16, 2013 at 10:04 PM, Ted Yu <yuzhihong@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > > Here is javadoc for toFloat():
> >>> > > > >
> >>> > > > >    * Presumes float encoded as IEEE 754 floating-point
"single
> >>> > format"
> >>> > > > >
> >>> > > > >    * @param bytes byte array
> >>> > > > >
> >>> > > > >    * @return Float made from passed byte array.
> >>> > > > >
> >>> > > > >    */
> >>> > > > >
> >>> > > > >   public static float toFloat(byte [] bytes) {
> >>> > > > >
> >>> > > > > So for values of '2.5', toFloat() is not the proper
method.
> >>> > > > >
> >>> > > > > You can float conversion provided by Java.
> >>> > > > >
> >>> > > > >
> >>> > > > > Cheers
> >>> > > > >
> >>> > > > >
> >>> > > > > On Fri, Aug 16, 2013 at 6:57 AM, Ted Yu <yuzhihong@gmail.com>
> >>> wrote:
> >>> > > > >
> >>> > > > > > Here is code from Bytes:
> >>> > > > > >
> >>> > > > > >   public static float toFloat(byte [] bytes, int
offset) {
> >>> > > > > >
> >>> > > > > >     return Float.intBitsToFloat(toInt(bytes, offset,
> >>> SIZEOF_INT));
> >>> > > > > >
> >>> > > > > > Looking at your sample data:
> >>> > > > > >
> >>> > > > > >  2010-02-04           column=stocks:open,
> >>> timestamp=1376567559424,
> >>> > > > > > value=*2.5*
> >>> > > > > >
> >>> > > > > > The length of value didn't match SIZEOF_INT.
> >>> > > > > >
> >>> > > > > > It seems you need to validate the values first.
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > Cheers
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Fri, Aug 16, 2013 at 3:42 AM, manish dunani
<
> >>> > manishd207@gmail.com
> >>> > > > > >wrote:
> >>> > > > > >
> >>> > > > > >> hello,
> >>> > > > > >>
> >>> > > > > >> I am using apache hadoop 1.1.2 and hbase 0.94.9
on pseudo
> >>> > distibuted
> >>> > > > > mode.
> >>> > > > > >>
> >>> > > > > >> I am trying to find Average open stocks values.
> >>> > > > > >>
> >>> > > > > >> *sample dataset in hbase::**(table name:nyse4)*
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>  2010-02-04           column=stocks:open,
> >>> timestamp=1376567559424,
> >>> > > > > >> value=2.5
> >>> > > > > >>  2010-02-04           column=stocks:symbol,
> >>> > timestamp=1376567559424,
> >>> > > > > >> value=QXM
> >>> > > > > >>  2010-02-05           column=stocks:open,
> >>> timestamp=1376567559429,
> >>> > > > > >> value=2.42
> >>> > > > > >>  2010-02-05           column=stocks:symbol,
> >>> > timestamp=1376567559429,
> >>> > > > > >> value=QXM
> >>> > > > > >>  2010-02-08           column=stocks:open,
> >>> timestamp=1376567559431,
> >>> > > > > >> value=2.33
> >>> > > > > >>  2010-02-08           column=stocks:symbol,
> >>> > timestamp=1376567559431,
> >>> > > > > >> value=QXM
> >>> > > > > >>
> >>> > > > > >> *code:*(please ignores the lines that are commenting)
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >> > package com.maddy;
> >>> > > > > >> >
> >>> > > > > >> > import java.io.IOException;
> >>> > > > > >> >
> >>> > > > > >> > import org.apache.hadoop.conf.Configuration;
> >>> > > > > >> > import org.apache.hadoop.fs.Path;
> >>> > > > > >> > import org.apache.hadoop.hbase.HBaseConfiguration;
> >>> > > > > >> > import org.apache.hadoop.hbase.client.Put;
> >>> > > > > >> > import org.apache.hadoop.hbase.client.Result;
> >>> > > > > >> > import org.apache.hadoop.hbase.client.Scan;
> >>> > > > > >> > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
> >>> > > > > >> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> >>> > > > > >> > import
> org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> >>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapper;
> >>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableReducer;
> >>> > > > > >> > import org.apache.hadoop.hbase.util.Bytes;
> >>> > > > > >> > //import org.apache.hadoop.io.DoubleWritable;
> >>> > > > > >> > import org.apache.hadoop.io.FloatWritable;
> >>> > > > > >> > import org.apache.hadoop.mapreduce.Job;
> >>> > > > > >> > import
> >>> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> > public class openaveragestock
> >>> > > > > >> > {
> >>> > > > > >> >     public static class map extends
> >>> > > > > >> > TableMapper<ImmutableBytesWritable,FloatWritable>
> >>> > > > > >> >     {
> >>> > > > > >> >         @Override
> >>> > > > > >> >
> >>> > > > > >> >         public void map(ImmutableBytesWritable
row,Result
> >>> > > > > value,Context
> >>> > > > > >> > context) throws IOException
> >>> > > > > >> >         {
> >>> > > > > >> >
> >>> > > > > >> >             byte[]
> >>> > > > > >> >
> val=(value.getValue("stocks".getBytes(),"open".getBytes()));
> >>> > > > > >> >             //byte[]
> >>> > > > > >> >
> >>> val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> >             ImmutableBytesWritable stock_symbol=new
> >>> > > > > >> > ImmutableBytesWritable("symbol".getBytes());
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> >             try
> >>> > > > > >> >             {
> >>> > > > > >> >                 context.write(stock_symbol,new
> >>> > > > > >> > FloatWritable(Bytes.toFloat(val)));
> >>> > > > > >> >             }
> >>> > > > > >> >             catch(InterruptedException
e)
> >>> > > > > >> >
> >>> > > > > >> >             {
> >>> > > > > >> >                  throw new IOException(e);
> >>> > > > > >> >             }
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> >         }
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> >     }
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> >     public static class reduce extends
> >>> > > > > >> >
> >>> > > > > >>
> >>> > > > >
> >>> > >
> >>>
> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
> >>> > > > > >> >     {
> >>> > > > > >> >
> >>> > > > > >> >         @Override
> >>> > > > > >> >         public void reduce(ImmutableBytesWritable
> >>> > > > > >> > key,Iterable<FloatWritable>values,Context
context) throws
> >>> > > > IOException,
> >>> > > > > >> > InterruptedException
> >>> > > > > >> >         {
> >>> > > > > >> >             float sum=0;
> >>> > > > > >> >             int count=0;
> >>> > > > > >> >           //  float average=0;
> >>> > > > > >> >             for(FloatWritable val:values)
> >>> > > > > >> >             {
> >>> > > > > >> >                 sum+=val.get();
> >>> > > > > >> >                 count++;
> >>> > > > > >> >             }
> >>> > > > > >> >             //average=(sum/count);
> >>> > > > > >> >             Put put=new Put(key.get());
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >>
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(sum/count));
> >>> > > > > >> >             System.out.println("For\t"+count+"\t
average
> >>> > > > > >> is:"+(sum/count));
> >>> > > > > >> >             context.write(key,put);
> >>> > > > > >> >
> >>> > > > > >> >         }
> >>> > > > > >> >
> >>> > > > > >> >     }
> >>> > > > > >> >
> >>> > > > > >> >     public static void main(String args[])
throws
> >>> IOException,
> >>> > > > > >> > ClassNotFoundException, InterruptedException
> >>> > > > > >> >     {
> >>> > > > > >> >         Configuration config=HBaseConfiguration.create();
> >>> > > > > >> >         config.addResource("/home/manish/workspace/hbase
> >>> > > > > >> > project/bin/hbase-site.xml");
> >>> > > > > >> >         Job job=new Job(config,"openstockaverage1");
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> >         Scan scan=new Scan();
> >>> > > > > >> >         scan.addFamily("stocks".getBytes());
> >>> > > > > >> >         scan.setFilter(new FirstKeyOnlyFilter());
> >>> > > > > >> >
> >>> > > > > >> >         TableMapReduceUtil.initTableMapperJob("nyse4",
> >>> > > > > >> >                 scan,
> >>> > > > > >> >                 map.class,
> >>> > > > > >> >                 ImmutableBytesWritable.class,
> >>> > > > > >> >                 FloatWritable.class,
> >>> > > > > >> >                 job);
> >>> > > > > >> >
> >>> > > > > >> >         TableMapReduceUtil.initTableReducerJob("nyse5",
> >>> > > > > >> >                 reduce.class,
> >>> > > > > >> >                 job);
> >>> > > > > >> >         //job.setReducerClass(reduce.class);
> >>> > > > > >> >
> >>> > > > > >> >         FileOutputFormat.setOutputPath(job,
new Path(
> >>> > > > > >> >
> >>> > > > > >> "hdfs://localhost:54310/user/manish/edurekahbasehadoop1"));
> >>> > > > > >> >         job.waitForCompletion(true);
> >>> > > > > >> >     }
> >>> > > > > >> >
> >>> > > > > >> > }
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> > *===>Got stuck into error:*
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> > 13/08/16 03:21:45 INFO mapred.JobClient:
Running job:
> >>> > > job_local_0001
> >>> > > > > >> > 13/08/16 03:21:46 INFO mapred.JobClient:
 map 0% reduce 0%
> >>> > > > > >> > 13/08/16 03:21:46 INFO mapreduce.TableOutputFormat:
> Created
> >>> > table
> >>> > > > > >> instance
> >>> > > > > >> > for nyse5
> >>> > > > > >> > 13/08/16 03:21:46 INFO util.ProcessTree:
setsid exited
> with
> >>> exit
> >>> > > > code
> >>> > > > > 0
> >>> > > > > >> > 13/08/16 03:21:47 INFO mapred.Task:  Using
> >>> > > ResourceCalculatorPlugin
> >>> > > > :
> >>> > > > > >> >
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@50b77c
> >>> > > > > >> > 13/08/16 03:21:47 INFO mapred.MapTask:
io.sort.mb = 100
> >>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask:
data buffer =
> >>> > > > 79691776/99614720
> >>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask:
record buffer =
> >>> > > 262144/327680
> >>> > > > > >> > 13/08/16 03:21:54 WARN mapred.LocalJobRunner:
> job_local_0001
> >>> > > > > >> > java.lang.IllegalArgumentException: offset
(0) + length
> (4)
> >>> > exceed
> >>> > > > the
> >>> > > > > >> > capacity of the array: 3
> >>> > > > > >> >     at
> >>> > > > > >> >
> >>> > > > > >>
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:543)
> >>> > > > > >> >     at
> >>> org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:690)
> >>> > > > > >> >     at
> >>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:584)
> >>> > > > > >> >     at
> >>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:574)
> >>> > > > > >> >     at
> >>> > > com.maddy.openaveragestock$map.map(openaveragestock.java:41)
> >>> > > > > >> >     at
> >>> > com.maddy.openaveragestock$map.map(openaveragestock.java:1)
> >>> > > > > >> >     at
> >>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> >>> > > > > >> >     at
> >>> > > > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
> >>> > > > > >> >     at
> >>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
> >>> > > > > >> >     at
> >>> > > > > >> >
> >>> > > > >
> >>> > >
> >>>
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
> >>> > > > > >> >
> >>> > > > > >> >
> >>> > > > > >> I cannot find where is it fail??
> >>> > > > > >> Can you please tell me??
> >>> > > > > >> where i was wrong..?
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >> Your help will be appreciated.
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >>
> >>> > > > > >> --
> >>> > > > > >> Regards
> >>> > > > > >>
> >>> > > > > >> *Manish Dunani*
> >>> > > > > >> *Contact No* : +91 9408329137
> >>> > > > > >> *skype id* : manish.dunani*
> >>> > > > > >> *
> >>> > > > > >>
> >>> > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > --
> >>> > > > Regards
> >>> > > >
> >>> > > > *Manish Dunani*
> >>> > > > *Contact No* : +91 9408329137
> >>> > > > *skype id* : manish.dunani*
> >>> > > > *
> >>> > > >
> >>> > >
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Regards
> >>> >
> >>> > *Manish Dunani*
> >>> > *Contact No* : +91 9408329137
> >>> > *skype id* : manish.dunani*
> >>> > *
> >>> >
> >>>
> >>
> >>
> >>
> >> --
> >> Regards
> >>
> >> *Manish Dunani*
> >> *Contact No* : +91 9408329137
> >> *skype id* : manish.dunani*
> >> *
> >>
> >>
> >>
> >
> >
> > --
> > Regards
> >
> > *Manish Dunani*
> > *Contact No* : +91 9408329137
> > *skype id* : manish.dunani*
> > *
> >
> >
> >
>
>
> --
> Regards
>
> *Manish Dunani*
> *Contact No* : +91 9408329137
> *skype id* : manish.dunani*
> *
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message