hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manish dunani <manishd...@gmail.com>
Subject Re: [Error]Finding average using hbase hadoop
Date Sun, 18 Aug 2013 02:58:53 GMT
But i want my output likewise::


ROW                    CELL+COLUMN

QXM                      column=stocks_output:average,
timestamp=XXXXXXXXXX, val
                      ue=XXXX
QTM                      column=stocks_output:average,
timestamp=XXXXXXXXXX, val
                      ue=XXXX


*sample dataset in hbase::**(table name:nyse4)*


 2010-02-04           column=stocks:open, timestamp=1376567559424,
value=2.5
 2010-02-04           column=stocks:symbol, timestamp=1376567559424,
value=QXM
 2010-02-05           column=stocks:open, timestamp=1376567559429,
value=2.42
 2010-02-05           column=stocks:symbol, timestamp=1376567559429,
value=QXM


===>>In my previous output i didn't get any symbol(qulifier)'s values in my
table as Row key..


hbase(main):004:0> scan 'nyse5'
ROW                   COLUMN+CELL
symbol               column=stocks_output:average, timestamp=1376749641978,
val
                      ue=@\xC6o\x11

*So,that i changed my programme like wise::*


package com.maddy;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
//import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class openaveragestock
{
    public static class map extends TableMapper<Text,FloatWritable>
    {
        private static String col_family="stocks";
        private static String qul="open";

        private static String col_family1="stocks";
        private static String qul1="symbol";

        private static byte[] colfamily2=Bytes.toBytes(col_family);
        private static byte[] qul2=Bytes.toBytes(qul);

        private static byte[] colfamily3=Bytes.toBytes(col_family1);
        private static byte[] qul3=Bytes.toBytes(qul1);

//        public static float toFloat(int qul2)
//        {
//            return Float.intBitsToFloat((qul2));
//
//        }
//
        private static Text k1=new Text();


        public void map(ImmutableBytesWritable row,Result value,Context
context) throws IOException
        {


            //byte[]
val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
            //String
k=Bytes.toString(value.getValue(Bytes.toBytes("stocks"),Bytes.toBytes("symbol")));
           byte[] val=value.getValue(colfamily2,qul2);
          String k=Bytes.toString(value.getValue(colfamily3,qul3));

            //ImmutableBytesWritable stock_symbol=new
ImmutableBytesWritable(qul3);

        k1.set(k);

            try
            {

                context.write(k1,new
FloatWritable(Float.parseFloat(Bytes.toString(val))));
            }

            catch(InterruptedException e)

            {
                 throw new IOException(e);
            }


        }


    }


    public static class Reduce extends Reducer<Text,FloatWritable,
Text,FloatWritable>
    {

        public void reduce(Text key,Iterable<FloatWritable>values,Context
context) throws IOException, InterruptedException
        {
            float sum=0;
            int count=0;
            float average=0;
            for(FloatWritable val:values)
            {
                sum+=val.get();
                count++;
            }
            average=(sum/count);
//            Put put=new Put(key.getBytes());
//
put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
            System.out.println("For\t"+count+"\t average is:"+average);
            context.write(key,new FloatWritable(average));

        }

    }

    public static void main(String args[]) throws IOException,
ClassNotFoundException, InterruptedException
    {
        Configuration config=HBaseConfiguration.create();
        config.addResource("/home/manish/workspace/hbase
project/bin/hbase-site.xml");
        Job job=new Job(config,"openstockaverage1");


        Scan scan=new Scan();
        scan.addFamily("stocks".getBytes());
        scan.setFilter(new FirstKeyOnlyFilter());

        TableMapReduceUtil.initTableMapperJob("nyse4",
                scan,
                map.class,
                ImmutableBytesWritable.class,
                FloatWritable.class,
                job);

//        TableMapReduceUtil.initTableReducerJob("nyse5",
//                Reduce.class,
//                job);
    job.setReducerClass(Reduce.class);

        FileOutputFormat.setOutputPath(job, new Path(
                "hdfs://localhost:54310/user/manish/Full_final_output_5"));
        job.waitForCompletion(true);
    }



}


*It throws the error::*

13/08/17 19:37:59 INFO mapred.JobClient: Running job: job_local_0001
13/08/17 19:37:59 INFO util.ProcessTree: setsid exited with exit code 0
13/08/17 19:37:59 INFO mapred.Task:  Using ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@bd96dd
13/08/17 19:37:59 INFO mapred.MapTask: io.sort.mb = 100
13/08/17 19:38:00 INFO mapred.JobClient:  map 0% reduce 0%
13/08/17 19:38:00 INFO mapred.MapTask: data buffer = 79691776/99614720
13/08/17 19:38:00 INFO mapred.MapTask: record buffer = 262144/327680
13/08/17 19:38:00 WARN mapred.LocalJobRunner: job_local_0001
java.lang.NullPointerException
    at org.apache.hadoop.io.Text.encode(Text.java:388)
    at org.apache.hadoop.io.Text.set(Text.java:178)
    at com.maddy.openaveragestock$map.map(openaveragestock.java:59)
    at com.maddy.openaveragestock$map.map(openaveragestock.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
13/08/17 19:38:01 INFO mapred.JobClient: Job complete: job_local_0001
13/08/17 19:38:01 INFO mapred.JobClient: Counters: 0





On Sun, Aug 18, 2013 at 7:55 AM, manish dunani <manishd207@gmail.com> wrote:

> *Here is output table===>>>
>
> *
>>
>> hbase(main):004:0> scan 'nyse5'
>> ROW
>> COLUMN+CELL
>>  symbol               column=stocks_output:average,
>> timestamp=1376749641978, val
>>                       ue=@\xC6o\x11
>>
>
>
>
>
> *Sample output  at my eclipse:::*
>
> 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
> 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass, with 1
> segments left of total size: 42242 bytes
> 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
> 13/08/17 07:27:21 INFO mapred.JobClient:  map 100% reduce 0%
> *
> For    2640    average is:6.201058*
>
>
> On Sat, Aug 17, 2013 at 11:59 PM, Jean-Marc Spaggiari <
> jean-marc@spaggiari.org> wrote:
>
>> Are you outputting to a table? From your code, I don't see any output
>> configured.
>>
>> 2013/8/17 manish dunani <manishd207@gmail.com>
>>
>> > Thanx a lot!!
>> > Jean.
>> >
>> > I am very thankful to you..And off course Ted also doing very good job.
>> > *
>> >
>> > Revised Code ::*
>> >
>> > Package com.maddy;
>> > >
>> > > import java.io.IOException;
>> > >
>> > > import org.apache.hadoop.conf.Configuration;
>> > > import org.apache.hadoop.fs.Path;
>> > > import org.apache.hadoop.hbase.HBaseConfiguration;
>> > > import org.apache.hadoop.hbase.client.Put;
>> > > import org.apache.hadoop.hbase.client.Result;
>> > > import org.apache.hadoop.hbase.client.Scan;
>> > > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
>> > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>> > > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
>> > > import org.apache.hadoop.hbase.mapreduce.TableMapper;
>> > > import org.apache.hadoop.hbase.mapreduce.TableReducer;
>> > > import org.apache.hadoop.hbase.util.Bytes;
>> > > //import org.apache.hadoop.io.DoubleWritable;
>> > > import org.apache.hadoop.io.FloatWritable;
>> > > import org.apache.hadoop.mapreduce.Job;
>> > > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> > >
>> > >
>> > > public class openaveragestock
>> > > {
>> > >     public static class map extends
>> > > TableMapper<ImmutableBytesWritable,FloatWritable>
>> > >     {
>> > >         private static String col_family="stocks";
>> > >         private static String qul="open";
>> > >
>> > >         private static String col_family1="stocks";
>> > >         private static String qul1="symbol";
>> > >
>> > >         private static byte[] colfamily2=Bytes.toBytes(col_family);
>> > >         private static byte[] qul2=Bytes.toBytes(qul);
>> > >
>> > >         private static byte[] colfamily3=Bytes.toBytes(col_family1);
>> > >         private static byte[] qul3=Bytes.toBytes(qul1);
>> > >
>> > > //        public static float toFloat(int qul2)
>> > > //        {
>> > > //            return Float.intBitsToFloat((qul2));
>> > > //
>> > > //        }
>> > > //
>> > >
>> > >
>> > >
>> > >         public void map(ImmutableBytesWritable row,Result
>> value,Context
>> > > context) throws IOException
>> > >         {
>> > >
>> > >
>> > >             //byte[]
>> > > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
>> > >            byte[] val=value.getValue(colfamily2,qul2);
>> > >
>> > >
>> > >             ImmutableBytesWritable stock_symbol=new
>> > > ImmutableBytesWritable(qul3);
>> > >
>> > >
>> > >
>> > >             try
>> > >             {
>> > >
>> > >                 context.write(stock_symbol,new
>> > > FloatWritable(Float.parseFloat(Bytes.toString(val))));
>> > >             }
>> > >
>> > >             catch(InterruptedException e)
>> > >
>> > >             {
>> > >                  throw new IOException(e);
>> > >             }
>> > >
>> > >
>> > >         }
>> > >
>> > >
>> > >     }
>> > >
>> > >
>> > >     public static class reduce extends
>> > >
>> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
>> > >     {
>> > >
>> > >         @Override
>> > >         public void reduce(ImmutableBytesWritable
>> > > key,Iterable<FloatWritable>values,Context context) throws IOException,
>> > > InterruptedException
>> > >         {
>> > >             float sum=0;
>> > >             int count=0;
>> > >             float average=0;
>> > >             for(FloatWritable val:values)
>> > >             {
>> > >                 sum+=val.get();
>> > >                 count++;
>> > >             }
>> > >             average=(sum/count);
>> > >             Put put=new Put(key.get());
>> > >
>> > >
>> >
>> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
>> > >             System.out.println("For\t"+count+"\t average
>> is:"+average);
>> > >             context.write(key,put);
>> > >
>> > >         }
>> > >
>> > >     }
>> > >
>> > >     public static void main(String args[]) throws IOException,
>> > > ClassNotFoundException, InterruptedException
>> > >     {
>> > >         Configuration config=HBaseConfiguration.create();
>> > >         config.addResource("/home/manish/workspace/hbase
>> > > project/bin/hbase-site.xml");
>> > >         Job job=new Job(config,"openstockaverage1");
>> > >
>> > >
>> > >         Scan scan=new Scan();
>> > >         scan.addFamily("stocks".getBytes());
>> > >         scan.setFilter(new FirstKeyOnlyFilter());
>> > >
>> > >         TableMapReduceUtil.initTableMapperJob("nyse4",
>> > >                 scan,
>> > >                 map.class,
>> > >                 ImmutableBytesWritable.class,
>> > >                 FloatWritable.class,
>> > >                 job);
>> > >
>> > >         TableMapReduceUtil.initTableReducerJob("nyse5",
>> > >                 reduce.class,
>> > >                 job);
>> > >         //job.setReducerClass(reduce.class);
>> > >
>> > >         //FileOutputFormat.setOutputPath(job, new Path(
>> > >             //
>> > > "hdfs://localhost:54310/user/manish/final_hbase_hadoop"));
>> > >         job.waitForCompletion(true);
>> > >     }
>> > >
>> > >
>> > >
>> > > }
>> > >
>> > >
>> > *Sample output  at my eclipse:::*
>> >
>> > 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
>> > > 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass,
>> with 1
>> > > segments left of total size: 42242 bytes
>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
>> > > 13/08/17 07:27:21 INFO mapred.JobClient:  map 100% reduce 0%
>> > > *For    2640    average is:6.201058*
>> > > 13/08/17 07:27:21 INFO mapred.Task:
>> Task:attempt_local_0001_r_000000_0 is
>> > > done. And is in the process of commiting
>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner: reduce > reduce
>> > > 13/08/17 07:27:21 INFO mapred.Task: Task
>> 'attempt_local_0001_r_000000_0'
>> > > done.
>> > > 13/08/17 07:27:22 WARN mapred.FileOutputCommitter: Output path is
>> null in
>> > > cleanup
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:  map 100% reduce 100%
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Job complete: job_local_0001
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Counters: 30
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   HBase Counters
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REMOTE_RPC_CALLS=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     RPC_CALLS=2643
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     RPC_RETRIES=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
>> > NOT_SERVING_REGION_EXCEPTION=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     NUM_SCANNER_RESTARTS=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     MILLIS_BETWEEN_NEXTS=5849
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     BYTES_IN_RESULTS=126719
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     BYTES_IN_REMOTE_RESULTS=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REGIONS_SCANNED=1
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     REMOTE_RPC_RETRIES=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   File Output Format Counters
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Bytes Written=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   FileSystemCounters
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     FILE_BYTES_READ=24176810
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
>> FILE_BYTES_WRITTEN=24552208
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   File Input Format Counters
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Bytes Read=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:   Map-Reduce Framework
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce input groups=1
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output materialized
>> > > bytes=42246
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Combine output records=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map input records=2640
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce shuffle bytes=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Physical memory (bytes)
>> > > snapshot=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce output records=1
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Spilled Records=5280
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output bytes=36960
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     CPU time spent (ms)=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Total committed heap
>> usage
>> > > (bytes)=321527808
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Virtual memory (bytes)
>> > > snapshot=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Combine input records=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Map output records=2640
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     SPLIT_RAW_BYTES=65
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:     Reduce input records=2640
>> > >
>> >
>> > *
>> > *
>> > *Question:::(please don't laugh at me if i will ask you a silly
>> question)
>> >
>> > *
>> > Here in code i set output directory.But,when i seen in hdfs directory
>> it is
>> > not contain any part-0000 file.It contains only SUCCESS file.*
>> > *
>> > Can i ask  why it is happen.??*
>> >
>> >
>> >
>> > *
>> >
>> >
>> >
>> >
>> > On Sat, Aug 17, 2013 at 5:24 PM, Jean-Marc Spaggiari <
>> > jean-marc@spaggiari.org> wrote:
>> >
>> > > Hi Manish.
>> > >
>> > > Looking a bit more at this, I think the issue is because you "floats"
>> are
>> > > written in you table as strings and not as floats....
>> > >
>> > > Can you try something link:
>> > > context.write(stock_symbol,new FloatWritable(
>> > > Float.parseFloat(Bytes.toString(val)))));
>> > >
>> > > Also, as asked previously,  " can you please paste you code on
>> pastbin?
>> > > Same for the exception."?
>> > >
>> > > Thanks,
>> > >
>> > > JM
>> > >
>> > > 2013/8/17 manish dunani <manishd207@gmail.com>
>> > >
>> > > > Hey jean,
>> > > > I did it according to you.
>> > > > I convert as u told..But still face the same error.
>> > > >
>> > > > And ted I am new to this don't get an idea how can i use this
>> > method..Can
>> > > > You please show me..?
>> > > >
>> > > > Your Help will be appreciated..
>> > > >
>> > > >
>> > > >
>> > > > On Fri, Aug 16, 2013 at 10:04 PM, Ted Yu <yuzhihong@gmail.com>
>> wrote:
>> > > >
>> > > > > Here is javadoc for toFloat():
>> > > > >
>> > > > >    * Presumes float encoded as IEEE 754 floating-point "single
>> > format"
>> > > > >
>> > > > >    * @param bytes byte array
>> > > > >
>> > > > >    * @return Float made from passed byte array.
>> > > > >
>> > > > >    */
>> > > > >
>> > > > >   public static float toFloat(byte [] bytes) {
>> > > > >
>> > > > > So for values of '2.5', toFloat() is not the proper method.
>> > > > >
>> > > > > You can float conversion provided by Java.
>> > > > >
>> > > > >
>> > > > > Cheers
>> > > > >
>> > > > >
>> > > > > On Fri, Aug 16, 2013 at 6:57 AM, Ted Yu <yuzhihong@gmail.com>
>> wrote:
>> > > > >
>> > > > > > Here is code from Bytes:
>> > > > > >
>> > > > > >   public static float toFloat(byte [] bytes, int offset)
{
>> > > > > >
>> > > > > >     return Float.intBitsToFloat(toInt(bytes, offset,
>> SIZEOF_INT));
>> > > > > >
>> > > > > > Looking at your sample data:
>> > > > > >
>> > > > > >  2010-02-04           column=stocks:open,
>> timestamp=1376567559424,
>> > > > > > value=*2.5*
>> > > > > >
>> > > > > > The length of value didn't match SIZEOF_INT.
>> > > > > >
>> > > > > > It seems you need to validate the values first.
>> > > > > >
>> > > > > >
>> > > > > > Cheers
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Aug 16, 2013 at 3:42 AM, manish dunani <
>> > manishd207@gmail.com
>> > > > > >wrote:
>> > > > > >
>> > > > > >> hello,
>> > > > > >>
>> > > > > >> I am using apache hadoop 1.1.2 and hbase 0.94.9 on pseudo
>> > distibuted
>> > > > > mode.
>> > > > > >>
>> > > > > >> I am trying to find Average open stocks values.
>> > > > > >>
>> > > > > >> *sample dataset in hbase::**(table name:nyse4)*
>> > > > > >>
>> > > > > >>
>> > > > > >>  2010-02-04           column=stocks:open,
>> timestamp=1376567559424,
>> > > > > >> value=2.5
>> > > > > >>  2010-02-04           column=stocks:symbol,
>> > timestamp=1376567559424,
>> > > > > >> value=QXM
>> > > > > >>  2010-02-05           column=stocks:open,
>> timestamp=1376567559429,
>> > > > > >> value=2.42
>> > > > > >>  2010-02-05           column=stocks:symbol,
>> > timestamp=1376567559429,
>> > > > > >> value=QXM
>> > > > > >>  2010-02-08           column=stocks:open,
>> timestamp=1376567559431,
>> > > > > >> value=2.33
>> > > > > >>  2010-02-08           column=stocks:symbol,
>> > timestamp=1376567559431,
>> > > > > >> value=QXM
>> > > > > >>
>> > > > > >> *code:*(please ignores the lines that are commenting)
>> > > > > >>
>> > > > > >>
>> > > > > >> > package com.maddy;
>> > > > > >> >
>> > > > > >> > import java.io.IOException;
>> > > > > >> >
>> > > > > >> > import org.apache.hadoop.conf.Configuration;
>> > > > > >> > import org.apache.hadoop.fs.Path;
>> > > > > >> > import org.apache.hadoop.hbase.HBaseConfiguration;
>> > > > > >> > import org.apache.hadoop.hbase.client.Put;
>> > > > > >> > import org.apache.hadoop.hbase.client.Result;
>> > > > > >> > import org.apache.hadoop.hbase.client.Scan;
>> > > > > >> > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
>> > > > > >> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapper;
>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableReducer;
>> > > > > >> > import org.apache.hadoop.hbase.util.Bytes;
>> > > > > >> > //import org.apache.hadoop.io.DoubleWritable;
>> > > > > >> > import org.apache.hadoop.io.FloatWritable;
>> > > > > >> > import org.apache.hadoop.mapreduce.Job;
>> > > > > >> > import
>> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > public class openaveragestock
>> > > > > >> > {
>> > > > > >> >     public static class map extends
>> > > > > >> > TableMapper<ImmutableBytesWritable,FloatWritable>
>> > > > > >> >     {
>> > > > > >> >         @Override
>> > > > > >> >
>> > > > > >> >         public void map(ImmutableBytesWritable
row,Result
>> > > > > value,Context
>> > > > > >> > context) throws IOException
>> > > > > >> >         {
>> > > > > >> >
>> > > > > >> >             byte[]
>> > > > > >> > val=(value.getValue("stocks".getBytes(),"open".getBytes()));
>> > > > > >> >             //byte[]
>> > > > > >> >
>> val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >             ImmutableBytesWritable stock_symbol=new
>> > > > > >> > ImmutableBytesWritable("symbol".getBytes());
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >             try
>> > > > > >> >             {
>> > > > > >> >                 context.write(stock_symbol,new
>> > > > > >> > FloatWritable(Bytes.toFloat(val)));
>> > > > > >> >             }
>> > > > > >> >             catch(InterruptedException e)
>> > > > > >> >
>> > > > > >> >             {
>> > > > > >> >                  throw new IOException(e);
>> > > > > >> >             }
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >         }
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >     }
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >     public static class reduce extends
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > >
>> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
>> > > > > >> >     {
>> > > > > >> >
>> > > > > >> >         @Override
>> > > > > >> >         public void reduce(ImmutableBytesWritable
>> > > > > >> > key,Iterable<FloatWritable>values,Context
context) throws
>> > > > IOException,
>> > > > > >> > InterruptedException
>> > > > > >> >         {
>> > > > > >> >             float sum=0;
>> > > > > >> >             int count=0;
>> > > > > >> >           //  float average=0;
>> > > > > >> >             for(FloatWritable val:values)
>> > > > > >> >             {
>> > > > > >> >                 sum+=val.get();
>> > > > > >> >                 count++;
>> > > > > >> >             }
>> > > > > >> >             //average=(sum/count);
>> > > > > >> >             Put put=new Put(key.get());
>> > > > > >> >
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(sum/count));
>> > > > > >> >             System.out.println("For\t"+count+"\t
average
>> > > > > >> is:"+(sum/count));
>> > > > > >> >             context.write(key,put);
>> > > > > >> >
>> > > > > >> >         }
>> > > > > >> >
>> > > > > >> >     }
>> > > > > >> >
>> > > > > >> >     public static void main(String args[]) throws
>> IOException,
>> > > > > >> > ClassNotFoundException, InterruptedException
>> > > > > >> >     {
>> > > > > >> >         Configuration config=HBaseConfiguration.create();
>> > > > > >> >         config.addResource("/home/manish/workspace/hbase
>> > > > > >> > project/bin/hbase-site.xml");
>> > > > > >> >         Job job=new Job(config,"openstockaverage1");
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >         Scan scan=new Scan();
>> > > > > >> >         scan.addFamily("stocks".getBytes());
>> > > > > >> >         scan.setFilter(new FirstKeyOnlyFilter());
>> > > > > >> >
>> > > > > >> >         TableMapReduceUtil.initTableMapperJob("nyse4",
>> > > > > >> >                 scan,
>> > > > > >> >                 map.class,
>> > > > > >> >                 ImmutableBytesWritable.class,
>> > > > > >> >                 FloatWritable.class,
>> > > > > >> >                 job);
>> > > > > >> >
>> > > > > >> >         TableMapReduceUtil.initTableReducerJob("nyse5",
>> > > > > >> >                 reduce.class,
>> > > > > >> >                 job);
>> > > > > >> >         //job.setReducerClass(reduce.class);
>> > > > > >> >
>> > > > > >> >         FileOutputFormat.setOutputPath(job, new
Path(
>> > > > > >> >
>> > > > > >> "hdfs://localhost:54310/user/manish/edurekahbasehadoop1"));
>> > > > > >> >         job.waitForCompletion(true);
>> > > > > >> >     }
>> > > > > >> >
>> > > > > >> > }
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > *===>Got stuck into error:*
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > 13/08/16 03:21:45 INFO mapred.JobClient: Running
job:
>> > > job_local_0001
>> > > > > >> > 13/08/16 03:21:46 INFO mapred.JobClient:  map 0%
reduce 0%
>> > > > > >> > 13/08/16 03:21:46 INFO mapreduce.TableOutputFormat:
Created
>> > table
>> > > > > >> instance
>> > > > > >> > for nyse5
>> > > > > >> > 13/08/16 03:21:46 INFO util.ProcessTree: setsid
exited with
>> exit
>> > > > code
>> > > > > 0
>> > > > > >> > 13/08/16 03:21:47 INFO mapred.Task:  Using
>> > > ResourceCalculatorPlugin
>> > > > :
>> > > > > >> > org.apache.hadoop.util.LinuxResourceCalculatorPlugin@50b77c
>> > > > > >> > 13/08/16 03:21:47 INFO mapred.MapTask: io.sort.mb
= 100
>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: data buffer
=
>> > > > 79691776/99614720
>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: record buffer
=
>> > > 262144/327680
>> > > > > >> > 13/08/16 03:21:54 WARN mapred.LocalJobRunner: job_local_0001
>> > > > > >> > java.lang.IllegalArgumentException: offset (0)
+ length (4)
>> > exceed
>> > > > the
>> > > > > >> > capacity of the array: 3
>> > > > > >> >     at
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:543)
>> > > > > >> >     at
>> org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:690)
>> > > > > >> >     at
>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:584)
>> > > > > >> >     at
>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:574)
>> > > > > >> >     at
>> > > com.maddy.openaveragestock$map.map(openaveragestock.java:41)
>> > > > > >> >     at
>> > com.maddy.openaveragestock$map.map(openaveragestock.java:1)
>> > > > > >> >     at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>> > > > > >> >     at
>> > > > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>> > > > > >> >     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>> > > > > >> >     at
>> > > > > >> >
>> > > > >
>> > >
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
>> > > > > >> >
>> > > > > >> >
>> > > > > >> I cannot find where is it fail??
>> > > > > >> Can you please tell me??
>> > > > > >> where i was wrong..?
>> > > > > >>
>> > > > > >>
>> > > > > >> Your help will be appreciated.
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >> --
>> > > > > >> Regards
>> > > > > >>
>> > > > > >> *Manish Dunani*
>> > > > > >> *Contact No* : +91 9408329137
>> > > > > >> *skype id* : manish.dunani*
>> > > > > >> *
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Regards
>> > > >
>> > > > *Manish Dunani*
>> > > > *Contact No* : +91 9408329137
>> > > > *skype id* : manish.dunani*
>> > > > *
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > Regards
>> >
>> > *Manish Dunani*
>> > *Contact No* : +91 9408329137
>> > *skype id* : manish.dunani*
>> > *
>> >
>>
>
>
>
> --
> Regards
>
> *Manish Dunani*
> *Contact No* : +91 9408329137
> *skype id* : manish.dunani*
> *
>
>
>


-- 
Regards

*Manish Dunani*
*Contact No* : +91 9408329137
*skype id* : manish.dunani*
*

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message