hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manish dunani <manishd...@gmail.com>
Subject Re: [Error]Finding average using hbase hadoop
Date Mon, 19 Aug 2013 12:32:59 GMT
hello,
jean

Did u find it??


On Sun, Aug 18, 2013 at 8:28 AM, manish dunani <manishd207@gmail.com> wrote:

> But i want my output likewise::
>
>
> ROW                    CELL+COLUMN
>
> QXM                      column=stocks_output:average,
> timestamp=XXXXXXXXXX, val
>                       ue=XXXX
> QTM                      column=stocks_output:average,
> timestamp=XXXXXXXXXX, val
>                       ue=XXXX
>
>
> *sample dataset in hbase::**(table name:nyse4)*
>
>
>  2010-02-04           column=stocks:open, timestamp=1376567559424,
> value=2.5
>  2010-02-04           column=stocks:symbol, timestamp=1376567559424,
> value=QXM
>  2010-02-05           column=stocks:open, timestamp=1376567559429,
> value=2.42
>  2010-02-05           column=stocks:symbol, timestamp=1376567559429,
> value=QXM
>
>
> ===>>In my previous output i didn't get any symbol(qulifier)'s values in
> my table as Row key..
>
>
>
> hbase(main):004:0> scan 'nyse5'
> ROW                   COLUMN+CELL
> symbol               column=stocks_output:average,
> timestamp=1376749641978, val
>                       ue=@\xC6o\x11
>
> *So,that i changed my programme like wise::*
>
>
> package com.maddy;
>
> import java.io.IOException;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.client.Result;
> import org.apache.hadoop.hbase.client.Scan;
> import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> import org.apache.hadoop.hbase.mapreduce.TableMapper;
> import org.apache.hadoop.hbase.util.Bytes;
> //import org.apache.hadoop.io.DoubleWritable;
> import org.apache.hadoop.io.FloatWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.Reducer;
>
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>
>
> public class openaveragestock
> {
>     public static class map extends TableMapper<Text,FloatWritable>
>     {
>         private static String col_family="stocks";
>         private static String qul="open";
>
>         private static String col_family1="stocks";
>         private static String qul1="symbol";
>
>         private static byte[] colfamily2=Bytes.toBytes(col_family);
>         private static byte[] qul2=Bytes.toBytes(qul);
>
>         private static byte[] colfamily3=Bytes.toBytes(col_family1);
>         private static byte[] qul3=Bytes.toBytes(qul1);
>
> //        public static float toFloat(int qul2)
> //        {
> //            return Float.intBitsToFloat((qul2));
> //
> //        }
> //
>         private static Text k1=new Text();
>
>
>
>         public void map(ImmutableBytesWritable row,Result value,Context
> context) throws IOException
>         {
>
>
>             //byte[]
> val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
>             //String
> k=Bytes.toString(value.getValue(Bytes.toBytes("stocks"),Bytes.toBytes("symbol")));
>            byte[] val=value.getValue(colfamily2,qul2);
>           String k=Bytes.toString(value.getValue(colfamily3,qul3));
>
>             //ImmutableBytesWritable stock_symbol=new
> ImmutableBytesWritable(qul3);
>
>         k1.set(k);
>
>             try
>             {
>
>                 context.write(k1,new
> FloatWritable(Float.parseFloat(Bytes.toString(val))));
>
>             }
>
>             catch(InterruptedException e)
>
>             {
>                  throw new IOException(e);
>             }
>
>
>         }
>
>
>     }
>
>
>     public static class Reduce extends Reducer<Text,FloatWritable,
> Text,FloatWritable>
>     {
>
>         public void reduce(Text key,Iterable<FloatWritable>values,Context
> context) throws IOException, InterruptedException
>
>         {
>             float sum=0;
>             int count=0;
>             float average=0;
>             for(FloatWritable val:values)
>             {
>                 sum+=val.get();
>                 count++;
>             }
>             average=(sum/count);
> //            Put put=new Put(key.getBytes());
> //
> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
>
>             System.out.println("For\t"+count+"\t average is:"+average);
>             context.write(key,new FloatWritable(average));
>
>
>         }
>
>     }
>
>     public static void main(String args[]) throws IOException,
> ClassNotFoundException, InterruptedException
>     {
>         Configuration config=HBaseConfiguration.create();
>         config.addResource("/home/manish/workspace/hbase
> project/bin/hbase-site.xml");
>         Job job=new Job(config,"openstockaverage1");
>
>
>         Scan scan=new Scan();
>         scan.addFamily("stocks".getBytes());
>         scan.setFilter(new FirstKeyOnlyFilter());
>
>         TableMapReduceUtil.initTableMapperJob("nyse4",
>                 scan,
>                 map.class,
>                 ImmutableBytesWritable.class,
>                 FloatWritable.class,
>                 job);
>
> //        TableMapReduceUtil.initTableReducerJob("nyse5",
> //                Reduce.class,
> //                job);
>     job.setReducerClass(Reduce.class);
>
>         FileOutputFormat.setOutputPath(job, new Path(
>                 "hdfs://localhost:54310/user/manish/Full_final_output_5"));
>         job.waitForCompletion(true);
>     }
>
>
>
> }
>
>
> *It throws the error::*
>
> 13/08/17 19:37:59 INFO mapred.JobClient: Running job: job_local_0001
> 13/08/17 19:37:59 INFO util.ProcessTree: setsid exited with exit code 0
> 13/08/17 19:37:59 INFO mapred.Task:  Using ResourceCalculatorPlugin :
> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@bd96dd
> 13/08/17 19:37:59 INFO mapred.MapTask: io.sort.mb = 100
> 13/08/17 19:38:00 INFO mapred.JobClient:  map 0% reduce 0%
> 13/08/17 19:38:00 INFO mapred.MapTask: data buffer = 79691776/99614720
> 13/08/17 19:38:00 INFO mapred.MapTask: record buffer = 262144/327680
> 13/08/17 19:38:00 WARN mapred.LocalJobRunner: job_local_0001
> java.lang.NullPointerException
>     at org.apache.hadoop.io.Text.encode(Text.java:388)
>     at org.apache.hadoop.io.Text.set(Text.java:178)
>     at com.maddy.openaveragestock$map.map(openaveragestock.java:59)
>
>     at com.maddy.openaveragestock$map.map(openaveragestock.java:1)
>     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>     at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
> 13/08/17 19:38:01 INFO mapred.JobClient: Job complete: job_local_0001
> 13/08/17 19:38:01 INFO mapred.JobClient: Counters: 0
>
>
>
>
>
> On Sun, Aug 18, 2013 at 7:55 AM, manish dunani <manishd207@gmail.com>wrote:
>
>> *Here is output table===>>>
>>
>> *
>>>
>>> hbase(main):004:0> scan 'nyse5'
>>> ROW
>>> COLUMN+CELL
>>>  symbol               column=stocks_output:average,
>>> timestamp=1376749641978, val
>>>                       ue=@\xC6o\x11
>>>
>>
>>
>>
>>
>> *Sample output  at my eclipse:::*
>>
>> 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
>> 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass, with 1
>> segments left of total size: 42242 bytes
>> 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
>> 13/08/17 07:27:21 INFO mapred.JobClient:  map 100% reduce 0%
>> *
>> For    2640    average is:6.201058*
>>
>>
>> On Sat, Aug 17, 2013 at 11:59 PM, Jean-Marc Spaggiari <
>> jean-marc@spaggiari.org> wrote:
>>
>>> Are you outputting to a table? From your code, I don't see any output
>>> configured.
>>>
>>> 2013/8/17 manish dunani <manishd207@gmail.com>
>>>
>>> > Thanx a lot!!
>>> > Jean.
>>> >
>>> > I am very thankful to you..And off course Ted also doing very good job.
>>> > *
>>> >
>>> > Revised Code ::*
>>> >
>>> > Package com.maddy;
>>> > >
>>> > > import java.io.IOException;
>>> > >
>>> > > import org.apache.hadoop.conf.Configuration;
>>> > > import org.apache.hadoop.fs.Path;
>>> > > import org.apache.hadoop.hbase.HBaseConfiguration;
>>> > > import org.apache.hadoop.hbase.client.Put;
>>> > > import org.apache.hadoop.hbase.client.Result;
>>> > > import org.apache.hadoop.hbase.client.Scan;
>>> > > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
>>> > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>>> > > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
>>> > > import org.apache.hadoop.hbase.mapreduce.TableMapper;
>>> > > import org.apache.hadoop.hbase.mapreduce.TableReducer;
>>> > > import org.apache.hadoop.hbase.util.Bytes;
>>> > > //import org.apache.hadoop.io.DoubleWritable;
>>> > > import org.apache.hadoop.io.FloatWritable;
>>> > > import org.apache.hadoop.mapreduce.Job;
>>> > > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>>> > >
>>> > >
>>> > > public class openaveragestock
>>> > > {
>>> > >     public static class map extends
>>> > > TableMapper<ImmutableBytesWritable,FloatWritable>
>>> > >     {
>>> > >         private static String col_family="stocks";
>>> > >         private static String qul="open";
>>> > >
>>> > >         private static String col_family1="stocks";
>>> > >         private static String qul1="symbol";
>>> > >
>>> > >         private static byte[] colfamily2=Bytes.toBytes(col_family);
>>> > >         private static byte[] qul2=Bytes.toBytes(qul);
>>> > >
>>> > >         private static byte[] colfamily3=Bytes.toBytes(col_family1);
>>> > >         private static byte[] qul3=Bytes.toBytes(qul1);
>>> > >
>>> > > //        public static float toFloat(int qul2)
>>> > > //        {
>>> > > //            return Float.intBitsToFloat((qul2));
>>> > > //
>>> > > //        }
>>> > > //
>>> > >
>>> > >
>>> > >
>>> > >         public void map(ImmutableBytesWritable row,Result
>>> value,Context
>>> > > context) throws IOException
>>> > >         {
>>> > >
>>> > >
>>> > >             //byte[]
>>> > > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
>>> > >            byte[] val=value.getValue(colfamily2,qul2);
>>> > >
>>> > >
>>> > >             ImmutableBytesWritable stock_symbol=new
>>> > > ImmutableBytesWritable(qul3);
>>> > >
>>> > >
>>> > >
>>> > >             try
>>> > >             {
>>> > >
>>> > >                 context.write(stock_symbol,new
>>> > > FloatWritable(Float.parseFloat(Bytes.toString(val))));
>>> > >             }
>>> > >
>>> > >             catch(InterruptedException e)
>>> > >
>>> > >             {
>>> > >                  throw new IOException(e);
>>> > >             }
>>> > >
>>> > >
>>> > >         }
>>> > >
>>> > >
>>> > >     }
>>> > >
>>> > >
>>> > >     public static class reduce extends
>>> > >
>>> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
>>> > >     {
>>> > >
>>> > >         @Override
>>> > >         public void reduce(ImmutableBytesWritable
>>> > > key,Iterable<FloatWritable>values,Context context) throws
>>> IOException,
>>> > > InterruptedException
>>> > >         {
>>> > >             float sum=0;
>>> > >             int count=0;
>>> > >             float average=0;
>>> > >             for(FloatWritable val:values)
>>> > >             {
>>> > >                 sum+=val.get();
>>> > >                 count++;
>>> > >             }
>>> > >             average=(sum/count);
>>> > >             Put put=new Put(key.get());
>>> > >
>>> > >
>>> >
>>> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
>>> > >             System.out.println("For\t"+count+"\t average
>>> is:"+average);
>>> > >             context.write(key,put);
>>> > >
>>> > >         }
>>> > >
>>> > >     }
>>> > >
>>> > >     public static void main(String args[]) throws IOException,
>>> > > ClassNotFoundException, InterruptedException
>>> > >     {
>>> > >         Configuration config=HBaseConfiguration.create();
>>> > >         config.addResource("/home/manish/workspace/hbase
>>> > > project/bin/hbase-site.xml");
>>> > >         Job job=new Job(config,"openstockaverage1");
>>> > >
>>> > >
>>> > >         Scan scan=new Scan();
>>> > >         scan.addFamily("stocks".getBytes());
>>> > >         scan.setFilter(new FirstKeyOnlyFilter());
>>> > >
>>> > >         TableMapReduceUtil.initTableMapperJob("nyse4",
>>> > >                 scan,
>>> > >                 map.class,
>>> > >                 ImmutableBytesWritable.class,
>>> > >                 FloatWritable.class,
>>> > >                 job);
>>> > >
>>> > >         TableMapReduceUtil.initTableReducerJob("nyse5",
>>> > >                 reduce.class,
>>> > >                 job);
>>> > >         //job.setReducerClass(reduce.class);
>>> > >
>>> > >         //FileOutputFormat.setOutputPath(job, new Path(
>>> > >             //
>>> > > "hdfs://localhost:54310/user/manish/final_hbase_hadoop"));
>>> > >         job.waitForCompletion(true);
>>> > >     }
>>> > >
>>> > >
>>> > >
>>> > > }
>>> > >
>>> > >
>>> > *Sample output  at my eclipse:::*
>>> >
>>> > 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
>>> > > 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass,
>>> with 1
>>> > > segments left of total size: 42242 bytes
>>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
>>> > > 13/08/17 07:27:21 INFO mapred.JobClient:  map 100% reduce 0%
>>> > > *For    2640    average is:6.201058*
>>> > > 13/08/17 07:27:21 INFO mapred.Task:
>>> Task:attempt_local_0001_r_000000_0 is
>>> > > done. And is in the process of commiting
>>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner: reduce > reduce
>>> > > 13/08/17 07:27:21 INFO mapred.Task: Task
>>> 'attempt_local_0001_r_000000_0'
>>> > > done.
>>> > > 13/08/17 07:27:22 WARN mapred.FileOutputCommitter: Output path is
>>> null in
>>> > > cleanup
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:  map 100% reduce 100%
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Job complete: job_local_0001
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Counters: 30
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   HBase Counters
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REMOTE_RPC_CALLS=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     RPC_CALLS=2643
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     RPC_RETRIES=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
>>> > NOT_SERVING_REGION_EXCEPTION=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     NUM_SCANNER_RESTARTS=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
>>> MILLIS_BETWEEN_NEXTS=5849
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     BYTES_IN_RESULTS=126719
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
>>> BYTES_IN_REMOTE_RESULTS=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REGIONS_SCANNED=1
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REMOTE_RPC_RETRIES=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   File Output Format
>>> Counters
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Bytes Written=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   FileSystemCounters
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     FILE_BYTES_READ=24176810
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
>>> FILE_BYTES_WRITTEN=24552208
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   File Input Format Counters
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Bytes Read=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   Map-Reduce Framework
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce input groups=1
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output materialized
>>> > > bytes=42246
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Combine output records=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map input records=2640
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce shuffle bytes=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Physical memory (bytes)
>>> > > snapshot=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce output records=1
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Spilled Records=5280
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output bytes=36960
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     CPU time spent (ms)=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Total committed heap
>>> usage
>>> > > (bytes)=321527808
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Virtual memory (bytes)
>>> > > snapshot=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Combine input records=0
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output records=2640
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     SPLIT_RAW_BYTES=65
>>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce input
>>> records=2640
>>> > >
>>> >
>>> > *
>>> > *
>>> > *Question:::(please don't laugh at me if i will ask you a silly
>>> question)
>>> >
>>> > *
>>> > Here in code i set output directory.But,when i seen in hdfs directory
>>> it is
>>> > not contain any part-0000 file.It contains only SUCCESS file.*
>>> > *
>>> > Can i ask  why it is happen.??*
>>> >
>>> >
>>> >
>>> > *
>>> >
>>> >
>>> >
>>> >
>>> > On Sat, Aug 17, 2013 at 5:24 PM, Jean-Marc Spaggiari <
>>> > jean-marc@spaggiari.org> wrote:
>>> >
>>> > > Hi Manish.
>>> > >
>>> > > Looking a bit more at this, I think the issue is because you
>>> "floats" are
>>> > > written in you table as strings and not as floats....
>>> > >
>>> > > Can you try something link:
>>> > > context.write(stock_symbol,new FloatWritable(
>>> > > Float.parseFloat(Bytes.toString(val)))));
>>> > >
>>> > > Also, as asked previously,  " can you please paste you code on
>>> pastbin?
>>> > > Same for the exception."?
>>> > >
>>> > > Thanks,
>>> > >
>>> > > JM
>>> > >
>>> > > 2013/8/17 manish dunani <manishd207@gmail.com>
>>> > >
>>> > > > Hey jean,
>>> > > > I did it according to you.
>>> > > > I convert as u told..But still face the same error.
>>> > > >
>>> > > > And ted I am new to this don't get an idea how can i use this
>>> > method..Can
>>> > > > You please show me..?
>>> > > >
>>> > > > Your Help will be appreciated..
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Fri, Aug 16, 2013 at 10:04 PM, Ted Yu <yuzhihong@gmail.com>
>>> wrote:
>>> > > >
>>> > > > > Here is javadoc for toFloat():
>>> > > > >
>>> > > > >    * Presumes float encoded as IEEE 754 floating-point "single
>>> > format"
>>> > > > >
>>> > > > >    * @param bytes byte array
>>> > > > >
>>> > > > >    * @return Float made from passed byte array.
>>> > > > >
>>> > > > >    */
>>> > > > >
>>> > > > >   public static float toFloat(byte [] bytes) {
>>> > > > >
>>> > > > > So for values of '2.5', toFloat() is not the proper method.
>>> > > > >
>>> > > > > You can float conversion provided by Java.
>>> > > > >
>>> > > > >
>>> > > > > Cheers
>>> > > > >
>>> > > > >
>>> > > > > On Fri, Aug 16, 2013 at 6:57 AM, Ted Yu <yuzhihong@gmail.com>
>>> wrote:
>>> > > > >
>>> > > > > > Here is code from Bytes:
>>> > > > > >
>>> > > > > >   public static float toFloat(byte [] bytes, int offset)
{
>>> > > > > >
>>> > > > > >     return Float.intBitsToFloat(toInt(bytes, offset,
>>> SIZEOF_INT));
>>> > > > > >
>>> > > > > > Looking at your sample data:
>>> > > > > >
>>> > > > > >  2010-02-04           column=stocks:open,
>>> timestamp=1376567559424,
>>> > > > > > value=*2.5*
>>> > > > > >
>>> > > > > > The length of value didn't match SIZEOF_INT.
>>> > > > > >
>>> > > > > > It seems you need to validate the values first.
>>> > > > > >
>>> > > > > >
>>> > > > > > Cheers
>>> > > > > >
>>> > > > > >
>>> > > > > > On Fri, Aug 16, 2013 at 3:42 AM, manish dunani <
>>> > manishd207@gmail.com
>>> > > > > >wrote:
>>> > > > > >
>>> > > > > >> hello,
>>> > > > > >>
>>> > > > > >> I am using apache hadoop 1.1.2 and hbase 0.94.9
on pseudo
>>> > distibuted
>>> > > > > mode.
>>> > > > > >>
>>> > > > > >> I am trying to find Average open stocks values.
>>> > > > > >>
>>> > > > > >> *sample dataset in hbase::**(table name:nyse4)*
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>  2010-02-04           column=stocks:open,
>>> timestamp=1376567559424,
>>> > > > > >> value=2.5
>>> > > > > >>  2010-02-04           column=stocks:symbol,
>>> > timestamp=1376567559424,
>>> > > > > >> value=QXM
>>> > > > > >>  2010-02-05           column=stocks:open,
>>> timestamp=1376567559429,
>>> > > > > >> value=2.42
>>> > > > > >>  2010-02-05           column=stocks:symbol,
>>> > timestamp=1376567559429,
>>> > > > > >> value=QXM
>>> > > > > >>  2010-02-08           column=stocks:open,
>>> timestamp=1376567559431,
>>> > > > > >> value=2.33
>>> > > > > >>  2010-02-08           column=stocks:symbol,
>>> > timestamp=1376567559431,
>>> > > > > >> value=QXM
>>> > > > > >>
>>> > > > > >> *code:*(please ignores the lines that are commenting)
>>> > > > > >>
>>> > > > > >>
>>> > > > > >> > package com.maddy;
>>> > > > > >> >
>>> > > > > >> > import java.io.IOException;
>>> > > > > >> >
>>> > > > > >> > import org.apache.hadoop.conf.Configuration;
>>> > > > > >> > import org.apache.hadoop.fs.Path;
>>> > > > > >> > import org.apache.hadoop.hbase.HBaseConfiguration;
>>> > > > > >> > import org.apache.hadoop.hbase.client.Put;
>>> > > > > >> > import org.apache.hadoop.hbase.client.Result;
>>> > > > > >> > import org.apache.hadoop.hbase.client.Scan;
>>> > > > > >> > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
>>> > > > > >> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
>>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapper;
>>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableReducer;
>>> > > > > >> > import org.apache.hadoop.hbase.util.Bytes;
>>> > > > > >> > //import org.apache.hadoop.io.DoubleWritable;
>>> > > > > >> > import org.apache.hadoop.io.FloatWritable;
>>> > > > > >> > import org.apache.hadoop.mapreduce.Job;
>>> > > > > >> > import
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> > public class openaveragestock
>>> > > > > >> > {
>>> > > > > >> >     public static class map extends
>>> > > > > >> > TableMapper<ImmutableBytesWritable,FloatWritable>
>>> > > > > >> >     {
>>> > > > > >> >         @Override
>>> > > > > >> >
>>> > > > > >> >         public void map(ImmutableBytesWritable
row,Result
>>> > > > > value,Context
>>> > > > > >> > context) throws IOException
>>> > > > > >> >         {
>>> > > > > >> >
>>> > > > > >> >             byte[]
>>> > > > > >> > val=(value.getValue("stocks".getBytes(),"open".getBytes()));
>>> > > > > >> >             //byte[]
>>> > > > > >> >
>>> val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >             ImmutableBytesWritable stock_symbol=new
>>> > > > > >> > ImmutableBytesWritable("symbol".getBytes());
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >             try
>>> > > > > >> >             {
>>> > > > > >> >                 context.write(stock_symbol,new
>>> > > > > >> > FloatWritable(Bytes.toFloat(val)));
>>> > > > > >> >             }
>>> > > > > >> >             catch(InterruptedException e)
>>> > > > > >> >
>>> > > > > >> >             {
>>> > > > > >> >                  throw new IOException(e);
>>> > > > > >> >             }
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >         }
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >     }
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >     public static class reduce extends
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > >
>>> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
>>> > > > > >> >     {
>>> > > > > >> >
>>> > > > > >> >         @Override
>>> > > > > >> >         public void reduce(ImmutableBytesWritable
>>> > > > > >> > key,Iterable<FloatWritable>values,Context
context) throws
>>> > > > IOException,
>>> > > > > >> > InterruptedException
>>> > > > > >> >         {
>>> > > > > >> >             float sum=0;
>>> > > > > >> >             int count=0;
>>> > > > > >> >           //  float average=0;
>>> > > > > >> >             for(FloatWritable val:values)
>>> > > > > >> >             {
>>> > > > > >> >                 sum+=val.get();
>>> > > > > >> >                 count++;
>>> > > > > >> >             }
>>> > > > > >> >             //average=(sum/count);
>>> > > > > >> >             Put put=new Put(key.get());
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(sum/count));
>>> > > > > >> >             System.out.println("For\t"+count+"\t
average
>>> > > > > >> is:"+(sum/count));
>>> > > > > >> >             context.write(key,put);
>>> > > > > >> >
>>> > > > > >> >         }
>>> > > > > >> >
>>> > > > > >> >     }
>>> > > > > >> >
>>> > > > > >> >     public static void main(String args[])
throws
>>> IOException,
>>> > > > > >> > ClassNotFoundException, InterruptedException
>>> > > > > >> >     {
>>> > > > > >> >         Configuration config=HBaseConfiguration.create();
>>> > > > > >> >         config.addResource("/home/manish/workspace/hbase
>>> > > > > >> > project/bin/hbase-site.xml");
>>> > > > > >> >         Job job=new Job(config,"openstockaverage1");
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >         Scan scan=new Scan();
>>> > > > > >> >         scan.addFamily("stocks".getBytes());
>>> > > > > >> >         scan.setFilter(new FirstKeyOnlyFilter());
>>> > > > > >> >
>>> > > > > >> >         TableMapReduceUtil.initTableMapperJob("nyse4",
>>> > > > > >> >                 scan,
>>> > > > > >> >                 map.class,
>>> > > > > >> >                 ImmutableBytesWritable.class,
>>> > > > > >> >                 FloatWritable.class,
>>> > > > > >> >                 job);
>>> > > > > >> >
>>> > > > > >> >         TableMapReduceUtil.initTableReducerJob("nyse5",
>>> > > > > >> >                 reduce.class,
>>> > > > > >> >                 job);
>>> > > > > >> >         //job.setReducerClass(reduce.class);
>>> > > > > >> >
>>> > > > > >> >         FileOutputFormat.setOutputPath(job,
new Path(
>>> > > > > >> >
>>> > > > > >> "hdfs://localhost:54310/user/manish/edurekahbasehadoop1"));
>>> > > > > >> >         job.waitForCompletion(true);
>>> > > > > >> >     }
>>> > > > > >> >
>>> > > > > >> > }
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> > *===>Got stuck into error:*
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> > 13/08/16 03:21:45 INFO mapred.JobClient: Running
job:
>>> > > job_local_0001
>>> > > > > >> > 13/08/16 03:21:46 INFO mapred.JobClient:  map
0% reduce 0%
>>> > > > > >> > 13/08/16 03:21:46 INFO mapreduce.TableOutputFormat:
Created
>>> > table
>>> > > > > >> instance
>>> > > > > >> > for nyse5
>>> > > > > >> > 13/08/16 03:21:46 INFO util.ProcessTree: setsid
exited with
>>> exit
>>> > > > code
>>> > > > > 0
>>> > > > > >> > 13/08/16 03:21:47 INFO mapred.Task:  Using
>>> > > ResourceCalculatorPlugin
>>> > > > :
>>> > > > > >> > org.apache.hadoop.util.LinuxResourceCalculatorPlugin@50b77c
>>> > > > > >> > 13/08/16 03:21:47 INFO mapred.MapTask: io.sort.mb
= 100
>>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: data
buffer =
>>> > > > 79691776/99614720
>>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: record
buffer =
>>> > > 262144/327680
>>> > > > > >> > 13/08/16 03:21:54 WARN mapred.LocalJobRunner:
job_local_0001
>>> > > > > >> > java.lang.IllegalArgumentException: offset
(0) + length (4)
>>> > exceed
>>> > > > the
>>> > > > > >> > capacity of the array: 3
>>> > > > > >> >     at
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:543)
>>> > > > > >> >     at
>>> org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:690)
>>> > > > > >> >     at
>>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:584)
>>> > > > > >> >     at
>>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:574)
>>> > > > > >> >     at
>>> > > com.maddy.openaveragestock$map.map(openaveragestock.java:41)
>>> > > > > >> >     at
>>> > com.maddy.openaveragestock$map.map(openaveragestock.java:1)
>>> > > > > >> >     at
>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>>> > > > > >> >     at
>>> > > > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>>> > > > > >> >     at
>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>>> > > > > >> >     at
>>> > > > > >> >
>>> > > > >
>>> > >
>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> I cannot find where is it fail??
>>> > > > > >> Can you please tell me??
>>> > > > > >> where i was wrong..?
>>> > > > > >>
>>> > > > > >>
>>> > > > > >> Your help will be appreciated.
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >> --
>>> > > > > >> Regards
>>> > > > > >>
>>> > > > > >> *Manish Dunani*
>>> > > > > >> *Contact No* : +91 9408329137
>>> > > > > >> *skype id* : manish.dunani*
>>> > > > > >> *
>>> > > > > >>
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > > >
>>> > > >
>>> > > > --
>>> > > > Regards
>>> > > >
>>> > > > *Manish Dunani*
>>> > > > *Contact No* : +91 9408329137
>>> > > > *skype id* : manish.dunani*
>>> > > > *
>>> > > >
>>> > >
>>> >
>>> >
>>> >
>>> > --
>>> > Regards
>>> >
>>> > *Manish Dunani*
>>> > *Contact No* : +91 9408329137
>>> > *skype id* : manish.dunani*
>>> > *
>>> >
>>>
>>
>>
>>
>> --
>> Regards
>>
>> *Manish Dunani*
>> *Contact No* : +91 9408329137
>> *skype id* : manish.dunani*
>> *
>>
>>
>>
>
>
> --
> Regards
>
> *Manish Dunani*
> *Contact No* : +91 9408329137
> *skype id* : manish.dunani*
> *
>
>
>


-- 
Regards

*Manish Dunani*
*Contact No* : +91 9408329137
*skype id* : manish.dunani*
*

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message