From Pankaj Misra <pankaj.mi...@impetus.co.in>
Subject HBase NonBlocking and Async Thrift
Date Mon, 19 Nov 2012 12:16:38 GMT
Dear All,

I am currently using Hadoop 0.23.1 with HBase 0.94.1 in a pseudo-distributed mode. I am trying
to use HBase Thrift API (not using Thrift2 yet) in a nonblocking and async mode to insert
a bulk of records. I am sharing the set of steps for everyone's information and setting the
context to my problem

Please find below the code that I am using for initializing the async client

TBinaryProtocol.Factory binProtoFactory=new TBinaryProtocol.Factory();

TAsyncClientManager clientManager=null;
TNonblockingSocket nonBlockingSocket=null;

try {
   clientManager=new TAsyncClientManager();
} catch (IOException e) {
  throw new RuntimeException(e);
try {
  nonBlockingSocket=new TNonblockingSocket(HOST_NAME,PORT_NUMBER);
} catch (IOException e) {
throw new RuntimeException(e);

And, I am initializing the client as shown below

Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, clientManager, nonBlockingSocket);

I could see two ways of using the client, i.e. one client for all the records to be inserted
or separate instance of client for every record. I thought since this is a non-blocking channel,
it would make sense to initialize 1 client for all the requests, since all the requests would
be streamed using a framed transport.

// 1 async client for all the requests
Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, clientManager, nonBlockingSocket);

// mutate rows called in a loop to insert multiple records, using the same client
client.mutateRow(table, ByteBuffer.wrap(key), mutations,mutationAttributes,new HBaseInsertAsyncHandler());

But soon I found that I was wrong as I got back the following error.

java.lang.IllegalStateException: Client is currently executing another method: org.apache.hadoop.hbase.thrift.generated.Hbase$AsyncClient$mutateRow_call
    at org.apache.thrift.async.TAsyncClient.checkReady(TAsyncClient.java:78)
    at org.apache.hadoop.hbase.thrift.generated.Hbase$AsyncClient.mutateRow(Hbase.java:2714)
Reading through the following JIRA educated me a bit more on this


So, I changed my code to initialize the client per record to be inserted.

//called both these statements for every record to be inserted in a loop
Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, clientManager, nonBlockingSocket);
client.mutateRow(table, ByteBuffer.wrap(key), mutations,mutationAttributes,new HBaseInsertAsyncHandler());

Even this failed with the following error
2012-11-19 17:25:15,275 WARN  [TAsyncClientManager#SelectorThread 9] async.TAsyncClientManager
(TAsyncClientManager.java:startPendingMethods(177)) - Caught exception in TAsyncClientManager!
    at java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:167)
    at java.nio.channels.SelectableChannel.register(SelectableChannel.java:254)
    at org.apache.thrift.transport.TNonblockingSocket.registerSelector(TNonblockingSocket.java:99)
    at org.apache.thrift.async.TAsyncMethodCall.start(TAsyncMethodCall.java:141)
    at org.apache.thrift.async.TAsyncClientManager$SelectThread.startPendingMethods(TAsyncClientManager.java:169)
    at org.apache.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)

So, it looked to me as if the channel registration could not happen in time for it to get
initialized and since the records are getting inserted in a loop, it possibly needs a time
window for initialization to get complete and insert the record. So I had to introduce a delay
for every such insertion, which I do not prefer to do. The code changes for that are as shown

Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, clientManager, nonBlockingSocket);
client.mutateRow(table, ByteBuffer.wrap(key), mutations,mutationAttributes,new HBaseInsertAsyncHandler());
synchronized (client.getProtocolFactory()) {
With the above change, I could see the records getting inserted into HBase using async thrift
client, but I think this is not the right solution and will look for some guidance from the
community to have a more consistent way to utilize the the async thrift capability without
any specific wait or sleep times, as putting a wait call, kills the async advantage and introduces
delays in the overall throughput. Looking forward for your help.

Thanks and Regards
Pankaj Misra


