hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Asaf Mesika <asaf.mes...@gmail.com>
Subject Re: problem in testing coprocessor endpoint
Date Fri, 12 Jul 2013 04:07:37 GMT
The only way to register endpoint coprocessor jars is by placing them in
lib dir if hbase and modifying hbase-site.xml to point to it under a
property name I forgot at the moment.
What you described is a way to register an Observer type coprocessor.


On Friday, July 12, 2013, ch huang wrote:

> i am testing coprocessor endpoint function, here is my testing process ,and
> error i get ,hope any expert on coprocessor can help me out
>
>
> # vi ColumnAggregationProtocol.java
>
> import java.io.IOException;
> import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
> // A sample protocol for performing aggregation at regions.
> public interface ColumnAggregationProtocol
> extends CoprocessorProtocol {
> // Perform aggregation for a given column at the region. The aggregation
> // will include all the rows inside the region. It can be extended to
> // allow passing start and end rows for a fine-grained aggregation.
>    public long sum(byte[] family, byte[] qualifier) throws IOException;
> }
>
>
> # vi ColumnAggregationEndpoint.java
>
>
> import java.io.FileWriter;
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.hadoop.hbase.CoprocessorEnvironment;
> import org.apache.hadoop.hbase.KeyValue;
> import org.apache.hadoop.hbase.client.Scan;
> import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
> import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
> import org.apache.hadoop.hbase.ipc.ProtocolSignature;
> import org.apache.hadoop.hbase.regionserver.HRegion;
> import org.apache.hadoop.hbase.regionserver.InternalScanner;
> import org.apache.hadoop.hbase.util.Bytes;
>
> //Aggregation implementation at a region.
>
> public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
>   implements ColumnAggregationProtocol {
>      @Override
>      public long sum(byte[] family, byte[] qualifier)
>      throws IOException {
>        // aggregate at each region
>          Scan scan = new Scan();
>          scan.addColumn(family, qualifier);
>          long sumResult = 0;
>
>          CoprocessorEnvironment ce = getEnvironment();
>          HRegion hr = ((RegionCoprocessorEnvironment)ce).getRegion();
>          InternalScanner scanner = hr.getScanner(scan);
>
>          try {
>            List<KeyValue> curVals = new ArrayList<KeyValue>();
>            boolean hasMore = false;
>            do {
>          curVals.clear();
>          hasMore = scanner.next(curVals);
>          KeyValue kv = curVals.get(0);
>          sumResult += Long.parseLong(Bytes.toString(kv.getValue()));
>
>            } while (hasMore);
>          } finally {
>              scanner.close();
>          }
>          return sumResult;
>       }
>
>       @Override
>       public long getProtocolVersion(String protocol, long clientVersion)
>              throws IOException {
>          // TODO Auto-generated method stub
>          return 0;
>       }
>
>       @Override
>
>       public ProtocolSignature getProtocolSignature(String protocol,
>              long clientVersion, int clientMethodsHash) throws IOException
> {
>           // TODO Auto-generated method stub
>           return null;
>       }
> }
>
> i compile and pack the two into test.jar,and put it into my HDFS filesystem
>
> and load it into my test table
>
> hbase(main):006:0> alter 'mytest', METHOD =>
> 'table_att','coprocessor'=>'hdfs:///
> 192.168.10.22:9000/alex/test.jar|ColumnAggregationEndpoint|1001'
>
> here is my testing java code
>
> package com.testme.demo;
> import java.io.IOException;
> import java.util.Map;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.HTableDescriptor;
> import org.apache.hadoop.hbase.client.*;
> import org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol;
> import org.apache.hadoop.hbase.util.*;
> import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;;
>
> public class TestCop {
>    private static Configuration conf =null;
>    private static String TEST_TABLE = "mytest";
>    private static String TEST_FAMILY = "myfl";
>    private static String TEST_QUALIFIER = "myqf";
>  /**
>   * @param args
>   */
>    static {
>           conf = HBaseConfiguration.create();
>           conf.addResource( "hbase-site.xml");
>    }
>
>  public static void main(String[] args) throws IOException,Throwable{
>   // TODO Auto-generated method stub
>   conf = HBaseConfiguration.create();
>
>    HTable table = new HTable(conf,TEST_TABLE);
>  //  HTableDescriptor htd = table.getTableDescriptor();
>
>    Scan scan = new Scan();
>    Map<byte[], Long> results;
>
>    results = table.coprocessorExec(ColumnAggregationProtocol.class,
> "1".getBytes(),"5".getBytes(), new Call<ColumnAggregationProtocol,Long>(){
>      public Long call(ColumnAggregationProtocol instance)throws
> IOException{
>        return (Long) instance.sum(TEST_FAMILY.getBytes(),
> TEST_QUALIFIER.getBytes());
>     }});
>
>    long sumResult = 0;
>    long expectedResult = 0;
>    for (Map.Entry<byte[], Long> e:results.entrySet()){
>     sumResult += e.getValue();
>    }
>    System.out.println(sumResult);
>  }
> }
> when i run it i get error
> Exception in thread "main"
> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException:
> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No matching
> handler for protocol
> org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region
> mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9.
>  at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463)
>  at
>
> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>  at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>  at java.lang.reflect.Method.invoke(Method.java:597)
>  at
>
> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320)
>  at
> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
>  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
> Source)
>  at java.lang.reflect.Constructor.newInstance(Unknown Source)
>  at
>
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:90)
>  at
>
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:79)
>  at
>
> org.apache.hadoop.hbase.client.ServerCallable.translateException(ServerCallable.java:228)
>  at
>
> org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:166)
>  at
> org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPCInvoker.java:79)
>  at com.sun.proxy.$Proxy8.sum(Unknown Source)
>  at com.testme.demo.TestCop$1.call(TestCop.java:41)
>  at com.testme.demo.TestCop$1.call(TestCop.java:1)
>  at
>
> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$4.call(HConnectionManager.java:1466)
>  at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
>  at java.util.concurrent.FutureTask.run(Unknown Source)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>  at java.lang.Thread.run(Unknown Source)
> Caused by:
>
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException):
> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No matching
> handler for protocol
> org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region
> mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9.
>  at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463)
>  at
>
> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>  at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>  at java.lang.reflect.Method.invoke(Method.java:597)
>  at
>
> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320)
>  at
> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)
>  at org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:995)
>  at
>
> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:86)
>  at com.sun.proxy.$Proxy7.execCoprocessor(Unknown Source)
>  at
> org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:75)
>  at
> org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:73)
>  at
>
> org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:163)
>  ... 10 more
>
> hbase(main):020:0> describe
> 'mytest'
> DESCRIPTION                                          ENABLED
>  {NAME => 'mytest', coprocessor$1 => 'hdfs:///192.16 true
>  8.10.22:9000/alex/test.jar|ColumnAggregationEndpoin
>  t|1001', FAMILIES => [{NAME => 'myfl'
>  , DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NO
>  NE', REPLICATION_SCOPE => '0', VERSIONS => '3', COM
>  PRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '21
>  47483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE
>   => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK =
>  > 'true', BLOCKCACHE => 'true'}]}
> 1 row(s) in 0.0920 seconds
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message