kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [1/2] kudu git commit: java: refactor KuduRpc so it isn't responsible for serialization
Date Wed, 15 Feb 2017 22:48:30 GMT
Repository: kudu
Updated Branches:
  refs/heads/master daf289d6b -> 987cbc23f


java: refactor KuduRpc so it isn't responsible for serialization

This removes KuduRpc.serialize() and replaces it with
KuduRpc.createRequestPB(). This means that we are now sending
RpcOutboundMessages up the Netty pipeline all the way, which makes
debugging a bit easier. It also helps further conceptually separate
serialization concerns from RPC implementations.

Further, outbound RPCs can now be logged with their actual protobuf
contents instead of their bytewise form.

Change-Id: Ifc1a86a1599d936da7461da98bff2f8663de18fb
Reviewed-on: http://gerrit.cloudera.org:8080/6016
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/79ffb17d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/79ffb17d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/79ffb17d

Branch: refs/heads/master
Commit: 79ffb17d7f4f324b4cace9cd4255cad4f11bef4d
Parents: daf289d
Author: Todd Lipcon <todd@apache.org>
Authored: Wed Feb 15 12:06:32 2017 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Wed Feb 15 21:16:59 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/AlterTableRequest.java   |  6 ++---
 .../org/apache/kudu/client/AsyncKuduClient.java |  3 +--
 .../apache/kudu/client/AsyncKuduScanner.java    |  8 +++---
 .../main/java/org/apache/kudu/client/Batch.java |  5 ++--
 .../org/apache/kudu/client/CallResponse.java    |  8 ------
 .../apache/kudu/client/CreateTableRequest.java  |  6 ++---
 .../apache/kudu/client/DeleteTableRequest.java  |  6 ++---
 .../client/GetMasterRegistrationRequest.java    |  8 ++----
 .../kudu/client/GetTableLocationsRequest.java   |  5 ++--
 .../kudu/client/GetTableSchemaRequest.java      |  6 ++---
 .../kudu/client/IsAlterTableDoneRequest.java    |  6 ++---
 .../kudu/client/IsCreateTableDoneRequest.java   |  5 ++--
 .../java/org/apache/kudu/client/KuduRpc.java    |  5 ++--
 .../apache/kudu/client/ListTablesRequest.java   |  6 ++---
 .../kudu/client/ListTabletServersRequest.java   |  8 ++----
 .../apache/kudu/client/ListTabletsRequest.java  |  8 ++----
 .../java/org/apache/kudu/client/Operation.java  |  5 ++--
 .../org/apache/kudu/client/PingRequest.java     |  8 ++----
 .../apache/kudu/client/RpcOutboundMessage.java  | 16 +++++++++++
 .../org/apache/kudu/client/TabletClient.java    | 28 +++++++++-----------
 20 files changed, 63 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
index e1921bb..15c8943 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
@@ -27,7 +27,6 @@ import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.util.Pair;
@@ -54,11 +53,10 @@ class AlterTableRequest extends KuduRpc<AlterTableResponse> {
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
+  Message createRequestPB() {
     TableIdentifierPB tableID = TableIdentifierPB.newBuilder().setTableName(name).build();
     this.builder.setTable(tableID);
-    return toChannelBuffer(header, this.builder.build());
+    return this.builder.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index c3fefab..0dd32b0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -55,7 +55,6 @@ import com.google.protobuf.Message;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
 
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioWorkerPool;
@@ -429,7 +428,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // timeouts and use its Deferred.
     final KuduRpc<KuduTable> fakeRpc = new KuduRpc<KuduTable>(null) {
       @Override
-      ChannelBuffer serialize(Message header) {
+      Message createRequestPB() {
         return null;
       }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index b2b73a5..f9ee7a3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -42,7 +42,6 @@ import com.google.protobuf.Message;
 import com.google.protobuf.ZeroCopyLiteralByteString;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -734,7 +733,8 @@ public final class AsyncKuduScanner {
     }
 
     /** Serializes this request.  */
-    ChannelBuffer serialize(Message header) {
+    @Override
+    Message createRequestPB() {
       final ScanRequestPB.Builder builder = ScanRequestPB.newBuilder();
       switch (state) {
         case OPENING:
@@ -789,9 +789,7 @@ public final class AsyncKuduScanner {
           throw new RuntimeException("unreachable!");
       }
 
-      ScanRequestPB request = builder.build();
-
-      return toChannelBuffer(header, request);
+      return builder.build();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index d50aed7..b2f1ae9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.Message;
 import com.google.protobuf.ZeroCopyLiteralByteString;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.WireProtocol;
 import org.apache.kudu.annotations.InterfaceAudience;
@@ -87,14 +86,14 @@ class Batch extends KuduRpc<BatchResponse> {
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
+  Message createRequestPB() {
     final Tserver.WriteRequestPB.Builder builder =
         Operation.createAndFillWriteRequestPB(operations);
     rowOperationsSizeBytes = builder.getRowOperations().getRows().size() +
                              builder.getRowOperations().getIndirectData().size();
     builder.setTabletId(ZeroCopyLiteralByteString.wrap(getTablet().getTabletIdAsBytes()));
     builder.setExternalConsistencyMode(externalConsistencyMode.pbVersion());
-    return toChannelBuffer(header, builder.build());
+    return builder.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
index 17007c8..cad01a4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
@@ -19,8 +19,6 @@ package org.apache.kudu.client;
 
 import java.util.List;
 
-import com.google.protobuf.Message;
-
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -79,12 +77,6 @@ final class CallResponse {
     return this.totalResponseSize;
   }
 
-  public <T extends Message> T parseResponse(T prototypeInstance) {
-    prototypeInstance.newBuilderForType();
-    return prototypeInstance;
-
-  }
-
   /**
    * @return A slice pointing to the section of the packet reserved for the main
    * protobuf message.

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
index 05f5c77..b647904 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.List;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.annotations.InterfaceAudience;
@@ -51,11 +50,10 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> {
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
+  Message createRequestPB() {
     this.builder.setName(this.name);
     this.builder.setSchema(ProtobufHelper.schemaToPb(this.schema));
-    return toChannelBuffer(header, this.builder.build());
+    return this.builder.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
index 83da304..542d803 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
@@ -18,7 +18,6 @@
 package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.master.Master;
@@ -40,13 +39,12 @@ class DeleteTableRequest extends KuduRpc<DeleteTableResponse> {
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
+  Message createRequestPB() {
     final Master.DeleteTableRequestPB.Builder builder = Master.DeleteTableRequestPB.newBuilder();
     Master.TableIdentifierPB tableID =
         Master.TableIdentifierPB.newBuilder().setTableName(name).build();
     builder.setTable(tableID);
-    return toChannelBuffer(header, builder.build());
+    return builder.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
index b0df178..bf5c864 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
@@ -23,7 +23,6 @@ import static org.apache.kudu.master.Master.GetMasterRegistrationResponsePB;
 import static org.apache.kudu.master.Master.MasterErrorPB;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.util.Pair;
@@ -40,11 +39,8 @@ public class GetMasterRegistrationRequest extends KuduRpc<GetMasterRegistrationR
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
-    final GetMasterRegistrationRequestPB.Builder builder =
-        GetMasterRegistrationRequestPB.newBuilder();
-    return toChannelBuffer(header, builder.build());
+  Message createRequestPB() {
+    return GetMasterRegistrationRequestPB.getDefaultInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
index e7d4637..f0f67e9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
@@ -20,7 +20,6 @@ package org.apache.kudu.client;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ZeroCopyLiteralByteString;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.master.Master;
@@ -75,7 +74,7 @@ class GetTableLocationsRequest extends KuduRpc<Master.GetTableLocationsResponseP
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
+  Message createRequestPB() {
     final Master.GetTableLocationsRequestPB.Builder builder = Master
         .GetTableLocationsRequestPB.newBuilder();
     builder.setTable(Master.TableIdentifierPB.newBuilder()
@@ -87,6 +86,6 @@ class GetTableLocationsRequest extends KuduRpc<Master.GetTableLocationsResponseP
       builder.setPartitionKeyEnd(ZeroCopyLiteralByteString.wrap(endKey));
     }
     builder.setMaxReturnedLocations(maxReturnedLocations);
-    return toChannelBuffer(header, builder.build());
+    return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
index 07e3061..62ff262 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
@@ -22,7 +22,6 @@ import static org.apache.kudu.master.Master.GetTableSchemaResponsePB;
 import static org.apache.kudu.master.Master.TableIdentifierPB;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.annotations.InterfaceAudience;
@@ -43,13 +42,12 @@ public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse>
{
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
+  Message createRequestPB() {
     final GetTableSchemaRequestPB.Builder builder = GetTableSchemaRequestPB.newBuilder();
     TableIdentifierPB tableID =
         TableIdentifierPB.newBuilder().setTableName(name).build();
     builder.setTable(tableID);
-    return toChannelBuffer(header, builder.build());
+    return builder.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
index 4b205cd..810e589 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
@@ -22,7 +22,6 @@ import static org.apache.kudu.master.Master.IsAlterTableDoneResponsePB;
 import static org.apache.kudu.master.Master.TableIdentifierPB;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.util.Pair;
@@ -43,13 +42,12 @@ class IsAlterTableDoneRequest extends KuduRpc<IsAlterTableDoneResponse>
{
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
+  Message createRequestPB() {
     final IsAlterTableDoneRequestPB.Builder builder = IsAlterTableDoneRequestPB.newBuilder();
     TableIdentifierPB tableID =
         TableIdentifierPB.newBuilder().setTableName(name).build();
     builder.setTable(tableID);
-    return toChannelBuffer(header, builder.build());
+    return builder.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
index 42866b7..46ac392 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
@@ -19,7 +19,6 @@ package org.apache.kudu.client;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.master.Master;
@@ -60,11 +59,11 @@ class IsCreateTableDoneRequest extends KuduRpc<Master.IsCreateTableDoneResponseP
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
+  Message createRequestPB() {
     final Master.IsCreateTableDoneRequestPB.Builder builder = Master
         .IsCreateTableDoneRequestPB.newBuilder();
     builder.setTable(Master.TableIdentifierPB.newBuilder().setTableId(
         ByteString.copyFromUtf8(tableId)));
-    return toChannelBuffer(header, builder.build());
+    return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 1ba909d..4a8b971 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
 import com.stumbleupon.async.Deferred;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -149,7 +150,7 @@ public abstract class KuduRpc<R> {
    * Notice that this method is package-private, so only classes within this
    * package can use this as a base class.
    */
-  abstract ChannelBuffer serialize(Message header);
+  abstract Message createRequestPB();
 
   /**
    * Package private way of getting the name of the RPC service.
@@ -367,7 +368,7 @@ public abstract class KuduRpc<R> {
   }
 
   static void readProtobuf(final Slice slice,
-      final com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      final Builder builder) {
     final int length = slice.length();
     final byte[] payload = slice.getRawArray();
     final int offset = slice.getRawOffset();

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
index 2962264..8de57cc 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.master.Master;
@@ -38,14 +37,13 @@ class ListTablesRequest extends KuduRpc<ListTablesResponse> {
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
+  Message createRequestPB() {
     final Master.ListTablesRequestPB.Builder builder =
         Master.ListTablesRequestPB.newBuilder();
     if (nameFilter != null) {
       builder.setNameFilter(nameFilter);
     }
-    return toChannelBuffer(header, builder.build());
+    return builder.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
index 68683b7..7ee244c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.util.Pair;
@@ -37,11 +36,8 @@ public class ListTabletServersRequest extends KuduRpc<ListTabletServersResponse>
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
-    final ListTabletServersRequestPB.Builder builder =
-        ListTabletServersRequestPB.newBuilder();
-    return toChannelBuffer(header, builder.build());
+  Message createRequestPB() {
+    return ListTabletServersRequestPB.getDefaultInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
index 80f24b3..f4ec511 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.tserver.Tserver;
@@ -35,11 +34,8 @@ class ListTabletsRequest extends KuduRpc<ListTabletsResponse> {
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
-    final Tserver.ListTabletsRequestPB.Builder builder =
-        Tserver.ListTabletsRequestPB.newBuilder();
-    return toChannelBuffer(header, builder.build());
+  Message createRequestPB() {
+    return Tserver.ListTabletsRequestPB.getDefaultInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index a8925cf..f11e5ff 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ZeroCopyLiteralByteString;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
@@ -128,7 +127,7 @@ public abstract class Operation extends KuduRpc<OperationResponse>
{
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
+  Message createRequestPB() {
     final Tserver.WriteRequestPB.Builder builder =
         createAndFillWriteRequestPB(ImmutableList.of(this));
     this.rowOperationSizeBytes = builder.getRowOperations().getRows().size() +
@@ -138,7 +137,7 @@ public abstract class Operation extends KuduRpc<OperationResponse>
{
     if (this.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
       builder.setPropagatedTimestamp(this.propagatedTimestamp);
     }
-    return toChannelBuffer(header, builder.build());
+    return builder.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
index 26a7f58..f56b8b5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
@@ -18,7 +18,6 @@
 package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
@@ -48,11 +47,8 @@ class PingRequest extends KuduRpc<PingResponse> {
   }
 
   @Override
-  ChannelBuffer serialize(Message header) {
-    assert header.isInitialized();
-    final Master.PingRequestPB.Builder builder =
-        Master.PingRequestPB.newBuilder();
-    return toChannelBuffer(header, builder.build());
+  Message createRequestPB() {
+    return Master.PingRequestPB.getDefaultInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
index fb96185..2efad45 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
@@ -18,9 +18,13 @@
 package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.rpc.RpcHeader.RequestHeader;
 
@@ -30,6 +34,8 @@ import org.apache.kudu.rpc.RpcHeader.RequestHeader;
  * for serializing these instances into wire-format-compatible buffers.
  */
 class RpcOutboundMessage {
+  private static final Logger LOG = LoggerFactory.getLogger(RpcOutboundMessage.class);
+
   private final RequestHeader header;
   private final Message body;
 
@@ -46,6 +52,13 @@ class RpcOutboundMessage {
     return body;
   }
 
+  @Override
+  public String toString() {
+    // TODO(todd): should this redact? it's only used at TRACE level, so hopefully OK.
+    return "RpcOutboundMessage[header={" + TextFormat.shortDebugString(header) +
+        "}, body={" + TextFormat.shortDebugString(body) + "}]";
+  }
+
   /**
    * Netty encoder implementation to serialize outbound messages.
    */
@@ -57,6 +70,9 @@ class RpcOutboundMessage {
         return obj;
       }
       RpcOutboundMessage msg = (RpcOutboundMessage)obj;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("{}: sending RPC {}", chan, msg);
+      }
       // TODO(todd): move this impl into this class and remove external
       // callers.
       return KuduRpc.toChannelBuffer(msg.getHeader(), msg.getBody());

http://git-wip-us.apache.org/repos/asf/kudu/blob/79ffb17d/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index cf2aef8..cbae767 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -163,7 +163,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
     if (!rpc.deadlineTracker.hasDeadline()) {
       LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout " + rpc);
     }
-    Pair<ChannelBuffer, Integer> encodedRpcAndId = null;
+    RpcOutboundMessage outbound = null;
     if (chan != null) {
       if (!rpc.getRequiredFeatures().isEmpty() &&
           !negotiationResult.serverFeatures.contains(
@@ -174,14 +174,14 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
         // TODO(todd): this should return here. We seem to lack test coverage!
       }
 
-      encodedRpcAndId = encode(rpc);
-      if (encodedRpcAndId == null) {  // Error during encoding.
+      outbound = encode(rpc);
+      if (outbound == null) {  // Error during encoding.
         return;  // Stop here.  RPC has been failed already.
       }
 
       final Channel chan = this.chan;  // Volatile read.
       if (chan != null) {  // Double check if we disconnected during encode().
-        Channels.write(chan, encodedRpcAndId.getFirst());
+        Channels.write(chan, outbound);
         return;
       }
     }
@@ -196,8 +196,9 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
         // cleanup() already took care of calling failOrRetryRpc() for us. If it did, the
entry we
         // added in rpcsInflight will be missing. If not, we have to call failOrRetryRpc()
         // ourselves after this synchronized block.
-        // `encodedRpcAndId` is null iff `chan` is null.
-        if (encodedRpcAndId == null || rpcsInflight.containsKey(encodedRpcAndId.getSecond()))
{
+        // `outbound` is null iff `chan` is null.
+        if (outbound == null ||
+            rpcsInflight.containsKey(outbound.getHeader().getCallId())) {
           failRpc = true;
         }
       } else {
@@ -222,9 +223,9 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
     }
   }
 
-  private <R> Pair<ChannelBuffer, Integer> encode(final KuduRpc<R> rpc)
{
+  private <R> RpcOutboundMessage encode(final KuduRpc<R> rpc) {
     final int rpcid = this.rpcid.incrementAndGet();
-    ChannelBuffer payload;
+    RpcOutboundMessage outbound;
     final String service = rpc.serviceName();
     final String method = rpc.method();
     try {
@@ -262,9 +263,9 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
         headerBuilder.setRequestId(requestIdBuilder);
       }
 
-      payload = rpc.serialize(headerBuilder.build());
+      outbound = new RpcOutboundMessage(headerBuilder.build(), rpc.createRequestPB());
     } catch (Exception e) {
-      LOG.error("Uncaught exception while serializing RPC: " + rpc, e);
+      LOG.error("Uncaught exception while constructing RPC request: " + rpc, e);
       rpc.errback(e);  // Make the RPC fail with the exception.
       return null;
     }
@@ -280,12 +281,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
       oldrpc.errback(new NonRecoverableException(statusIllegalState));
     }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(getPeerUuidLoggingString() + chan + " Sending RPC #" + rpcid +
-          ", payload=" + payload);
-    }
-
-    return new Pair<>(payload, rpcid);
+    return outbound;
   }
 
   /**


Mime
View raw message