kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [26/36] incubator-kudu git commit: [java-client] repackage to org.apache.kudu (Part 1)
Date Mon, 25 Jul 2016 17:15:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
new file mode 100644
index 0000000..f1a9075
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import com.google.protobuf.ByteString;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.Common;
+import org.kududb.consensus.Metadata;
+import org.kududb.master.Master;
+import org.kududb.util.NetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Class grouping the callback and the errback for GetMasterRegistration calls
+ * made in getMasterTableLocationsPB.
+ */
+@InterfaceAudience.Private
+final class GetMasterRegistrationReceived {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GetMasterRegistrationReceived.class);
+
+  private final List<HostAndPort> masterAddrs;
+  private final Deferred<Master.GetTableLocationsResponsePB> responseD;
+  private final int numMasters;
+
+  // Used to avoid calling 'responseD' twice.
+  private final AtomicBoolean responseDCalled = new AtomicBoolean(false);
+
+  // Number of responses we've receives: used to tell whether or not we've received
+  // errors/replies from all of the masters, or if there are any
+  // GetMasterRegistrationRequests still pending.
+  private final AtomicInteger countResponsesReceived = new AtomicInteger(0);
+
+  // Exceptions received so far: kept for debugging purposes.
+  // (see: NoLeaderMasterFoundException#create() for how this is used).
+  private final List<Exception> exceptionsReceived =
+      Collections.synchronizedList(new ArrayList<Exception>());
+
+  /**
+   * Creates an object that holds the state needed to retrieve master table's location.
+   * @param masterAddrs Addresses of all master replicas that we want to retrieve the
+   *                    registration from.
+   * @param responseD Deferred object that will hold the GetTableLocationsResponsePB object for
+   *                  the master table.
+   */
+  public GetMasterRegistrationReceived(List<HostAndPort> masterAddrs,
+                                       Deferred<Master.GetTableLocationsResponsePB> responseD) {
+    this.masterAddrs = masterAddrs;
+    this.responseD = responseD;
+    this.numMasters = masterAddrs.size();
+  }
+
+  /**
+   * Creates a callback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
+   * @see GetMasterRegistrationCB
+   * @param hostAndPort Host and part for the RPC we're attaching this to. Host and port must
+   *                    be valid.
+   * @return The callback object that can be added to the RPC request.
+   */
+  public Callback<Void, GetMasterRegistrationResponse> callbackForNode(HostAndPort hostAndPort) {
+    return new GetMasterRegistrationCB(hostAndPort);
+  }
+
+  /**
+   * Creates an errback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
+   * @see GetMasterRegistrationErrCB
+   * @param hostAndPort Host and port for the RPC we're attaching this to. Used for debugging
+   *                    purposes.
+   * @return The errback object that can be added to the RPC request.
+   */
+  public Callback<Void, Exception> errbackForNode(HostAndPort hostAndPort) {
+    return new GetMasterRegistrationErrCB(hostAndPort);
+  }
+
+  /**
+   * Checks if we've already received a response or an exception from every master that
+   * we've sent a GetMasterRegistrationRequest to. If so -- and no leader has been found
+   * (that is, 'responseD' was never called) -- pass a {@link NoLeaderMasterFoundException}
+   * to responseD.
+   */
+  private void incrementCountAndCheckExhausted() {
+    if (countResponsesReceived.incrementAndGet() == numMasters) {
+      if (responseDCalled.compareAndSet(false, true)) {
+        boolean allUnrecoverable = true;
+        for (Exception ex : exceptionsReceived) {
+          if (!(ex instanceof NonRecoverableException)) {
+            allUnrecoverable = false;
+            break;
+          }
+        }
+        String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
+        // Doing a negative check because allUnrecoverable stays true if there are no exceptions.
+        if (!allUnrecoverable) {
+          String message = "Master config (" + allHosts + ") has no leader.";
+          Exception ex;
+          if (exceptionsReceived.isEmpty()) {
+            LOG.warn("None of the provided masters (" + allHosts + ") is a leader, will retry.");
+            ex = new NoLeaderMasterFoundException(Status.ServiceUnavailable(message));
+          } else {
+            LOG.warn("Unable to find the leader master (" + allHosts + "), will retry");
+            String joinedMsg = message + ". Exceptions received: " +
+                Joiner.on(",").join(
+                    Lists.transform(exceptionsReceived, Functions.toStringFunction()));
+            Status statusServiceUnavailable = Status.ServiceUnavailable(joinedMsg);
+            ex = new NoLeaderMasterFoundException(
+                statusServiceUnavailable,
+                exceptionsReceived.get(exceptionsReceived.size() - 1));
+          }
+          responseD.callback(ex);
+        } else {
+          Status statusConfigurationError = Status.ConfigurationError(
+              "Couldn't find a valid master in (" + allHosts +
+                  "), exceptions: " + exceptionsReceived);
+          // This will stop retries.
+          responseD.callback(new NonRecoverableException(statusConfigurationError));
+        }
+      }
+    }
+  }
+
+  /**
+   * Callback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
+   * If a request (paired to a specific master) returns a reply that indicates it's a leader,
+   * the callback in 'responseD' is invoked with an initialized GetTableLocationResponsePB
+   * object containing the leader's RPC address.
+   * If the master is not a leader, increment 'countResponsesReceived': if the count equals to
+   * the number of masters, pass {@link NoLeaderMasterFoundException} into
+   * 'responseD' if no one else had called 'responseD' before; otherwise, do nothing.
+   */
+  final class GetMasterRegistrationCB implements Callback<Void, GetMasterRegistrationResponse> {
+    private final HostAndPort hostAndPort;
+
+    public GetMasterRegistrationCB(HostAndPort hostAndPort) {
+      this.hostAndPort = hostAndPort;
+    }
+
+    @Override
+    public Void call(GetMasterRegistrationResponse r) throws Exception {
+      Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
+          Master.TabletLocationsPB.ReplicaPB.newBuilder();
+
+      Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
+      tsInfoBuilder.addRpcAddresses(ProtobufHelper.hostAndPortToPB(hostAndPort));
+      tsInfoBuilder.setPermanentUuid(r.getInstanceId().getPermanentUuid());
+      replicaBuilder.setTsInfo(tsInfoBuilder);
+      if (r.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
+        replicaBuilder.setRole(r.getRole());
+        Master.TabletLocationsPB.Builder locationBuilder = Master.TabletLocationsPB.newBuilder();
+        locationBuilder.setPartition(
+            Common.PartitionPB.newBuilder().setPartitionKeyStart(ByteString.EMPTY)
+                                           .setPartitionKeyEnd(ByteString.EMPTY));
+        locationBuilder.setTabletId(
+            ByteString.copyFromUtf8(AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER));
+        locationBuilder.addReplicas(replicaBuilder);
+        // No one else has called this before us.
+        if (responseDCalled.compareAndSet(false, true)) {
+          responseD.callback(
+              Master.GetTableLocationsResponsePB.newBuilder().addTabletLocations(
+                  locationBuilder.build()).build()
+          );
+        } else {
+          LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
+              hostAndPort.toString());
+        }
+      } else {
+        incrementCountAndCheckExhausted();
+      }
+      return null;
+    }
+
+    @Override
+    public String toString() {
+      return "get master registration for " + hostAndPort.toString();
+    }
+  }
+
+  /**
+   * Errback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
+   * Stores each exception in 'exceptionsReceived'. Increments 'countResponseReceived': if
+   * the count is equal to the number of masters and no one else had called 'responseD' before,
+   * pass a {@link NoLeaderMasterFoundException} into 'responseD'; otherwise, do
+   * nothing.
+   */
+  final class GetMasterRegistrationErrCB implements Callback<Void, Exception> {
+    private final HostAndPort hostAndPort;
+
+    public GetMasterRegistrationErrCB(HostAndPort hostAndPort) {
+      this.hostAndPort = hostAndPort;
+    }
+
+    @Override
+    public Void call(Exception e) throws Exception {
+      LOG.warn("Error receiving a response from: " + hostAndPort, e);
+      exceptionsReceived.add(e);
+      incrementCountAndCheckExhausted();
+      return null;
+    }
+
+    @Override
+    public String toString() {
+      return "get master registration errback for " + hostAndPort.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
new file mode 100644
index 0000000..bc3d81e
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.google.protobuf.Message;
+import static org.kududb.consensus.Metadata.*;
+import static org.kududb.master.Master.*;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.util.Pair;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Package-private RPC that can only go to master.
+ */
+@InterfaceAudience.Private
+public class GetMasterRegistrationRequest extends KuduRpc<GetMasterRegistrationResponse> {
+  private static final String GET_MASTER_REGISTRATION = "GetMasterRegistration";
+
+  public GetMasterRegistrationRequest(KuduTable masterTable) {
+    super(masterTable);
+  }
+
+  @Override
+  ChannelBuffer serialize(Message header) {
+    assert header.isInitialized();
+    final GetMasterRegistrationRequestPB.Builder builder =
+        GetMasterRegistrationRequestPB.newBuilder();
+    return toChannelBuffer(header, builder.build());
+  }
+
+  @Override
+  String serviceName() { return MASTER_SERVICE_NAME; }
+
+  @Override
+  String method() {
+    return GET_MASTER_REGISTRATION;
+  }
+
+  @Override
+  Pair<GetMasterRegistrationResponse, Object> deserialize(CallResponse callResponse,
+                                                          String tsUUID) throws Exception {
+    final GetMasterRegistrationResponsePB.Builder respBuilder =
+        GetMasterRegistrationResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), respBuilder);
+    RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER;
+    if (!respBuilder.hasError() || respBuilder.getError().getCode() !=
+        MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) {
+      role = respBuilder.getRole();
+    }
+    GetMasterRegistrationResponse response = new GetMasterRegistrationResponse(
+        deadlineTracker.getElapsedMillis(),
+        tsUUID,
+        role,
+        respBuilder.getRegistration(),
+        respBuilder.getInstanceId());
+    return new Pair<GetMasterRegistrationResponse, Object>(
+        response, respBuilder.hasError() ? respBuilder.getError() : null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
new file mode 100644
index 0000000..292710c
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.kududb.WireProtocol;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.consensus.Metadata;
+import org.kududb.master.Master;
+
+/**
+ * Response for {@link GetMasterRegistrationRequest}.
+ */
+@InterfaceAudience.Private
+public class GetMasterRegistrationResponse extends KuduRpcResponse {
+
+  private final Metadata.RaftPeerPB.Role role;
+  private final WireProtocol.ServerRegistrationPB serverRegistration;
+  private final WireProtocol.NodeInstancePB instanceId;
+
+  /**
+   * Describes a response to a {@link GetMasterRegistrationRequest}, built from
+   * {@link Master.GetMasterRegistrationResponsePB}.
+   *
+   * @param role Master's role in the config.
+   * @param serverRegistration server registration (RPC and HTTP addresses) for this master.
+   * @param instanceId Node instance (permanent uuid and
+   */
+  public GetMasterRegistrationResponse(long elapsedMillis, String tsUUID,
+                                       Metadata.RaftPeerPB.Role role,
+                                       WireProtocol.ServerRegistrationPB serverRegistration,
+                                       WireProtocol.NodeInstancePB instanceId) {
+    super(elapsedMillis, tsUUID);
+    this.role = role;
+    this.serverRegistration = serverRegistration;
+    this.instanceId = instanceId;
+  }
+
+  /**
+   * Returns this master's role in the config.
+   *
+   * @see Metadata.RaftPeerPB.Role
+   * @return Node's role in the cluster, or FOLLOWER if the node is not initialized.
+   */
+  public Metadata.RaftPeerPB.Role getRole() {
+    return role;
+  }
+
+  /**
+   * Returns the server registration (list of RPC and HTTP ports) for this master.
+   *
+   * @return The {@link WireProtocol.ServerRegistrationPB} object for this master.
+   */
+  public WireProtocol.ServerRegistrationPB getServerRegistration() {
+    return serverRegistration;
+  }
+
+  /**
+   * The node instance (initial sequence number and permanent uuid) for this master.
+   *
+   * @return The {@link WireProtocol.NodeInstancePB} object for this master.
+   */
+  public WireProtocol.NodeInstancePB getInstanceId() {
+    return instanceId;
+  }
+
+  @Override
+  public String toString() {
+    return "GetMasterRegistrationResponse{" +
+        "role=" + role +
+        ", serverRegistration=" + serverRegistration +
+        ", instanceId=" + instanceId +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
new file mode 100644
index 0000000..616b523
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.ZeroCopyLiteralByteString;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.master.Master;
+import org.kududb.util.Pair;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Package-private RPC that can only go to a master.
+ */
+@InterfaceAudience.Private
+class GetTableLocationsRequest extends KuduRpc<Master.GetTableLocationsResponsePB> {
+
+  private final byte[] startPartitionKey;
+  private final byte[] endKey;
+  private final String tableId;
+
+  GetTableLocationsRequest(KuduTable table, byte[] startPartitionKey,
+                           byte[] endPartitionKey, String tableId) {
+    super(table);
+    if (startPartitionKey != null && endPartitionKey != null
+        && Bytes.memcmp(startPartitionKey, endPartitionKey) > 0) {
+      throw new IllegalArgumentException(
+          "The start partition key must be smaller or equal to the end partition key");
+    }
+    this.startPartitionKey = startPartitionKey;
+    this.endKey = endPartitionKey;
+    this.tableId = tableId;
+  }
+
+  @Override
+  String serviceName() { return MASTER_SERVICE_NAME; }
+
+  @Override
+  String method() {
+    return "GetTableLocations";
+  }
+
+  @Override
+  Pair<Master.GetTableLocationsResponsePB, Object> deserialize(
+      final CallResponse callResponse, String tsUUID)
+      throws Exception {
+    Master.GetTableLocationsResponsePB.Builder builder = Master.GetTableLocationsResponsePB
+        .newBuilder();
+    readProtobuf(callResponse.getPBMessage(), builder);
+    Master.GetTableLocationsResponsePB resp = builder.build();
+    return new Pair<Master.GetTableLocationsResponsePB, Object>(
+        resp, builder.hasError() ? builder.getError() : null);
+  }
+
+  @Override
+  ChannelBuffer serialize(Message header) {
+    final Master.GetTableLocationsRequestPB.Builder builder = Master
+        .GetTableLocationsRequestPB.newBuilder();
+    builder.setTable(Master.TableIdentifierPB.newBuilder().
+        setTableId(ByteString.copyFromUtf8(tableId)));
+    if (startPartitionKey != null) {
+      builder.setPartitionKeyStart(ZeroCopyLiteralByteString.wrap(startPartitionKey));
+    }
+    if (endKey != null) {
+      builder.setPartitionKeyEnd(ZeroCopyLiteralByteString.wrap(endKey));
+    }
+    return toChannelBuffer(header, builder.build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
new file mode 100644
index 0000000..bb17816
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.google.protobuf.Message;
+import static org.kududb.master.Master.*;
+
+import org.kududb.Schema;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.util.Pair;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * RPC to fetch a table's schema
+ */
+@InterfaceAudience.Private
+public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse> {
+  static final String GET_TABLE_SCHEMA = "GetTableSchema";
+  private final String name;
+
+
+  GetTableSchemaRequest(KuduTable masterTable, String name) {
+    super(masterTable);
+    this.name = name;
+  }
+
+  @Override
+  ChannelBuffer serialize(Message header) {
+    assert header.isInitialized();
+    final GetTableSchemaRequestPB.Builder builder = GetTableSchemaRequestPB.newBuilder();
+    TableIdentifierPB tableID =
+        TableIdentifierPB.newBuilder().setTableName(name).build();
+    builder.setTable(tableID);
+    return toChannelBuffer(header, builder.build());
+  }
+
+  @Override
+  String serviceName() { return MASTER_SERVICE_NAME; }
+
+  @Override
+  String method() {
+    return GET_TABLE_SCHEMA;
+  }
+
+  @Override
+  Pair<GetTableSchemaResponse, Object> deserialize(CallResponse callResponse,
+                                                   String tsUUID) throws Exception {
+    final GetTableSchemaResponsePB.Builder respBuilder = GetTableSchemaResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), respBuilder);
+    Schema schema = ProtobufHelper.pbToSchema(respBuilder.getSchema());
+    GetTableSchemaResponse response = new GetTableSchemaResponse(
+        deadlineTracker.getElapsedMillis(),
+        tsUUID,
+        schema,
+        respBuilder.getTableId().toStringUtf8(),
+        ProtobufHelper.pbToPartitionSchema(respBuilder.getPartitionSchema(), schema),
+        respBuilder.getCreateTableDone());
+    return new Pair<GetTableSchemaResponse, Object>(
+        response, respBuilder.hasError() ? respBuilder.getError() : null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
new file mode 100644
index 0000000..72ac68e
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.kududb.Schema;
+import org.kududb.annotations.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class GetTableSchemaResponse extends KuduRpcResponse {
+
+  private final Schema schema;
+  private final PartitionSchema partitionSchema;
+  private final boolean createTableDone;
+  private final String tableId;
+
+  /**
+   * @param ellapsedMillis Time in milliseconds since RPC creation to now
+   * @param schema the table's schema
+   * @param partitionSchema the table's partition schema
+   */
+  GetTableSchemaResponse(long ellapsedMillis,
+                         String tsUUID,
+                         Schema schema,
+                         String tableId,
+                         PartitionSchema partitionSchema,
+                         boolean createTableDone) {
+    super(ellapsedMillis, tsUUID);
+    this.schema = schema;
+    this.partitionSchema = partitionSchema;
+    this.createTableDone = createTableDone;
+    this.tableId = tableId;
+  }
+
+  /**
+   * Get the table's schema.
+   * @return Table's schema
+   */
+  public Schema getSchema() {
+    return schema;
+  }
+
+  /**
+   * Get the table's partition schema.
+   * @return the table's partition schema
+   */
+  public PartitionSchema getPartitionSchema() {
+    return partitionSchema;
+  }
+
+  /**
+   * Tells if the original CreateTable call has completed and the tablets are ready.
+   * @return true if the table is created, otherwise false
+   */
+  public boolean isCreateTableDone() {
+    return createTableDone;
+  }
+
+  /**
+   * Get the table's unique identifier.
+   * @return the table's tableId
+   */
+  public String getTableId() {
+    return tableId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/HasFailedRpcException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/HasFailedRpcException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/HasFailedRpcException.java
new file mode 100644
index 0000000..08dda52
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/HasFailedRpcException.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *   - Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   - Redistributions in binary form must reproduce the above copyright notice,
+ *     this list of conditions and the following disclaimer in the documentation
+ *     and/or other materials provided with the distribution.
+ *   - Neither the name of the StumbleUpon nor the names of its contributors
+ *     may be used to endorse or promote products derived from this software
+ *     without specific prior written permission.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.kududb.client;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+/**
+ * Interface implemented by {@link KuduException}s that can tell you which
+ * RPC failed.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface HasFailedRpcException {
+
+  /**
+   * Returns the RPC that caused this exception.
+   */
+  KuduRpc<?> getFailedRpc();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/IPCUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IPCUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/client/IPCUtil.java
new file mode 100644
index 0000000..45240dd
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IPCUtil.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.client;
+
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+import org.kududb.annotations.InterfaceAudience;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Helper methods for RPCs.
+ */
+@InterfaceAudience.Private
+public class IPCUtil {
+  /**
+   * Write out header, param, and cell block if there is one.
+   * @param dos
+   * @param header
+   * @param param
+   * @return Total number of bytes written.
+   * @throws java.io.IOException
+   */
+  public static int write(final OutputStream dos, final Message header, final Message param)
+      throws IOException {
+    // Must calculate total size and write that first so other side can read it all in in one
+    // swoop.  This is dictated by how the server is currently written.  Server needs to change
+    // if we are to be able to write without the length prefixing.
+    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
+    return write(dos, header, param, totalSize);
+  }
+
+  private static int write(final OutputStream dos, final Message header, final Message param,
+                           final int totalSize)
+      throws IOException {
+    // I confirmed toBytes does same as say DataOutputStream#writeInt.
+    dos.write(toBytes(totalSize));
+    header.writeDelimitedTo(dos);
+    if (param != null) param.writeDelimitedTo(dos);
+    dos.flush();
+    return totalSize;
+  }
+
+  /**
+   * @return Size on the wire when the two messages are written with writeDelimitedTo
+   */
+  public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
+    int totalSize = 0;
+    for (Message m: messages) {
+      if (m == null) continue;
+      totalSize += m.getSerializedSize();
+      totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
+    }
+    return totalSize;
+  }
+
+  public static byte[] toBytes(int val) {
+    byte [] b = new byte[4];
+    for(int i = 3; i > 0; i--) {
+      b[i] = (byte) val;
+      val >>>= 8;
+    }
+    b[0] = (byte) val;
+    return b;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/Insert.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Insert.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Insert.java
new file mode 100644
index 0000000..67b389f
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Insert.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+/**
+ * Represents a single row insert. Instances of this class should not be reused.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class Insert extends Operation {
+
+  Insert(KuduTable table) {
+    super(table);
+  }
+
+  @Override
+  ChangeType getChangeType() {
+    return ChangeType.INSERT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
new file mode 100644
index 0000000..ca161f5
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.google.protobuf.Message;
+import static org.kududb.master.Master.*;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.util.Pair;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * RPC used to check if an alter is running for the specified table
+ */
+@InterfaceAudience.Private
+class IsAlterTableDoneRequest extends KuduRpc<IsAlterTableDoneResponse> {
+
+  static final String IS_ALTER_TABLE_DONE = "IsAlterTableDone";
+  private final String name;
+
+
+  IsAlterTableDoneRequest(KuduTable masterTable, String name) {
+    super(masterTable);
+    this.name = name;
+  }
+
+  @Override
+  ChannelBuffer serialize(Message header) {
+    assert header.isInitialized();
+    final IsAlterTableDoneRequestPB.Builder builder = IsAlterTableDoneRequestPB.newBuilder();
+    TableIdentifierPB tableID =
+        TableIdentifierPB.newBuilder().setTableName(name).build();
+    builder.setTable(tableID);
+    return toChannelBuffer(header, builder.build());
+  }
+
+  @Override
+  String serviceName() { return MASTER_SERVICE_NAME; }
+
+  @Override
+  String method() {
+    return IS_ALTER_TABLE_DONE;
+  }
+
+  @Override
+  Pair<IsAlterTableDoneResponse, Object> deserialize(final CallResponse callResponse,
+                                                       String tsUUID) throws Exception {
+    final IsAlterTableDoneResponsePB.Builder respBuilder = IsAlterTableDoneResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), respBuilder);
+    IsAlterTableDoneResponse resp = new IsAlterTableDoneResponse(deadlineTracker.getElapsedMillis(),
+        tsUUID, respBuilder.getDone());
+    return new Pair<IsAlterTableDoneResponse, Object>(
+        resp, respBuilder.hasError() ? respBuilder.getError() : null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneResponse.java
new file mode 100644
index 0000000..356c085
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneResponse.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+/**
+ * Response to a isAlterTableDone command to use to know if an alter table is currently running on
+ * the specified table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class IsAlterTableDoneResponse extends KuduRpcResponse {
+
+  private final boolean done;
+
+  IsAlterTableDoneResponse(long elapsedMillis, String tsUUID, boolean done) {
+    super(elapsedMillis, tsUUID);
+    this.done = done;
+  }
+
+  /**
+   * Tells if the table is done being altered or not.
+   * @return whether the table alter is done
+   */
+  public boolean isDone() {
+    return done;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
new file mode 100644
index 0000000..8e4679c
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.master.Master;
+import org.kududb.util.Pair;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Package-private RPC that can only go to a master.
+ */
+@InterfaceAudience.Private
+class IsCreateTableDoneRequest extends KuduRpc<Master.IsCreateTableDoneResponsePB> {
+
+  private final String tableId;
+
+  IsCreateTableDoneRequest(KuduTable table, String tableId) {
+    super(table);
+    this.tableId = tableId;
+  }
+
+  @Override
+  String serviceName() { return MASTER_SERVICE_NAME; }
+
+  @Override
+  String method() {
+    return "IsCreateTableDone";
+  }
+
+  @Override
+  Pair<Master.IsCreateTableDoneResponsePB, Object> deserialize(
+      final CallResponse callResponse, String tsUUID) throws Exception {
+    Master.IsCreateTableDoneResponsePB.Builder builder = Master.IsCreateTableDoneResponsePB
+        .newBuilder();
+    readProtobuf(callResponse.getPBMessage(), builder);
+    Master.IsCreateTableDoneResponsePB resp = builder.build();
+    return new Pair<Master.IsCreateTableDoneResponsePB, Object>(
+        resp, builder.hasError() ? builder.getError() : null);
+  }
+
+  @Override
+  ChannelBuffer serialize(Message header) {
+    final Master.IsCreateTableDoneRequestPB.Builder builder = Master
+        .IsCreateTableDoneRequestPB.newBuilder();
+    builder.setTable(Master.TableIdentifierPB.newBuilder().setTableId(
+        ByteString.copyFromUtf8(tableId)));
+    return toChannelBuffer(header, builder.build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
new file mode 100644
index 0000000..2fbde58
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.google.common.primitives.UnsignedLongs;
+import com.sangupta.murmur.Murmur2;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.client.PartitionSchema.HashBucketSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+
+/**
+ * Utility class for encoding rows into primary and partition keys.
+ */
+@InterfaceAudience.Private
+class KeyEncoder {
+
+  private final ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+  /**
+   * Encodes the primary key of the row.
+   *
+   * @param row the row to encode
+   * @return the encoded primary key of the row
+   */
+  public byte[] encodePrimaryKey(final PartialRow row) {
+    buf.reset();
+
+    final Schema schema = row.getSchema();
+    for (int columnIdx = 0; columnIdx < schema.getPrimaryKeyColumnCount(); columnIdx++) {
+      final boolean isLast = columnIdx + 1 == schema.getPrimaryKeyColumnCount();
+      encodeColumn(row, columnIdx, isLast);
+    }
+    return extractByteArray();
+  }
+
+  /**
+   * Encodes the provided row into a partition key according to the partition schema.
+   *
+   * @param row the row to encode
+   * @param partitionSchema the partition schema describing the table's partitioning
+   * @return an encoded partition key
+   */
+  public byte[] encodePartitionKey(PartialRow row, PartitionSchema partitionSchema) {
+    buf.reset();
+    if (!partitionSchema.getHashBucketSchemas().isEmpty()) {
+      ByteBuffer bucketBuf = ByteBuffer.allocate(4 * partitionSchema.getHashBucketSchemas().size());
+      bucketBuf.order(ByteOrder.BIG_ENDIAN);
+
+      for (final HashBucketSchema hashBucketSchema : partitionSchema.getHashBucketSchemas()) {
+        encodeColumns(row, hashBucketSchema.getColumnIds());
+        byte[] encodedColumns = extractByteArray();
+        long hash = Murmur2.hash64(encodedColumns,
+                                   encodedColumns.length,
+                                   hashBucketSchema.getSeed());
+        int bucket = (int) UnsignedLongs.remainder(hash, hashBucketSchema.getNumBuckets());
+        bucketBuf.putInt(bucket);
+      }
+
+      assert bucketBuf.arrayOffset() == 0;
+      buf.write(bucketBuf.array(), 0, bucketBuf.position());
+    }
+
+    encodeColumns(row, partitionSchema.getRangeSchema().getColumns());
+    return extractByteArray();
+  }
+
+  /**
+   * Encodes a sequence of columns from the row.
+   * @param row the row containing the columns to encode
+   * @param columnIds the IDs of each column to encode
+   */
+  private void encodeColumns(PartialRow row, List<Integer> columnIds) {
+    for (int i = 0; i < columnIds.size(); i++) {
+      boolean isLast = i + 1 == columnIds.size();
+      encodeColumn(row, row.getSchema().getColumnIndex(columnIds.get(i)), isLast);
+    }
+  }
+
+  /**
+   * Encodes a single column of a row.
+   * @param row the row being encoded
+   * @param columnIdx the column index of the column to encode
+   * @param isLast whether the column is the last component of the key
+   */
+  private void encodeColumn(PartialRow row, int columnIdx, boolean isLast) {
+    final Schema schema = row.getSchema();
+    final ColumnSchema column = schema.getColumnByIndex(columnIdx);
+    if (!row.isSet(columnIdx)) {
+      throw new IllegalStateException(String.format("Primary key column %s is not set",
+                                                    column.getName()));
+    }
+    final Type type = column.getType();
+
+    if (type == Type.STRING || type == Type.BINARY) {
+      addBinaryComponent(row.getVarLengthData().get(columnIdx), isLast);
+    } else {
+      addComponent(row.getRowAlloc(),
+                   schema.getColumnOffset(columnIdx),
+                   type.getSize(),
+                   type);
+    }
+  }
+
+  /**
+   * Encodes a byte buffer into the key.
+   * @param value the value to encode
+   * @param isLast whether the value is the final component in the key
+   */
+  private void addBinaryComponent(ByteBuffer value, boolean isLast) {
+    value.reset();
+
+    // TODO find a way to not have to read byte-by-byte that doesn't require extra copies. This is
+    // especially slow now that users can pass direct byte buffers.
+    while (value.hasRemaining()) {
+      byte currentByte = value.get();
+      buf.write(currentByte);
+      if (!isLast && currentByte == 0x00) {
+        // If we're a middle component of a composite key, we need to add a \x00
+        // at the end in order to separate this component from the next one. However,
+        // if we just did that, we'd have issues where a key that actually has
+        // \x00 in it would compare wrong, so we have to instead add \x00\x00, and
+        // encode \x00 as \x00\x01. -- key_encoder.h
+        buf.write(0x01);
+      }
+    }
+
+    if (!isLast) {
+      buf.write(0x00);
+      buf.write(0x00);
+    }
+  }
+
+  /**
+   * Encodes a value of the given type into the key.
+   * @param value the value to encode
+   * @param offset the offset into the {@code value} buffer that the value begins
+   * @param len the length of the value
+   * @param type the type of the value to encode
+   */
+  private void addComponent(byte[] value, int offset, int len, Type type) {
+    switch (type) {
+      case INT8:
+      case INT16:
+      case INT32:
+      case INT64:
+      case TIMESTAMP:
+        // Picking the first byte because big endian.
+        byte lastByte = value[offset + (len - 1)];
+        lastByte = Bytes.xorLeftMostBit(lastByte);
+        buf.write(lastByte);
+        if (len > 1) {
+          for (int i = len - 2; i >= 0; i--) {
+            buf.write(value[offset + i]);
+          }
+        }
+        break;
+      default:
+        throw new IllegalArgumentException(String.format(
+            "The column type %s is not a valid key component type", type));
+    }
+  }
+
+  /**
+   * Returns the encoded key, and resets the key encoder to be used for another key.
+   * @return the encoded key which has been built through calls to {@link #addComponent}
+   */
+  private byte[] extractByteArray() {
+    byte[] bytes = buf.toByteArray();
+    buf.reset();
+    return bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
new file mode 100644
index 0000000..cfd4662
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -0,0 +1,415 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import com.stumbleupon.async.Deferred;
+import org.kududb.Schema;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+/**
+ * A synchronous and thread-safe client for Kudu.
+ * <p>
+ * This class acts as a wrapper around {@link AsyncKuduClient}. The {@link Deferred} objects are
+ * joined against using the default admin operation timeout
+ * (see {@link org.kududb.client.KuduClient.KuduClientBuilder#defaultAdminOperationTimeoutMs(long)} (long)}).
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduClient implements AutoCloseable {
+
+  public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduClient.class);
+
+  private final AsyncKuduClient asyncClient;
+
+  KuduClient(AsyncKuduClient asyncClient) {
+    this.asyncClient = asyncClient;
+  }
+
+  /**
+   * Create a table on the cluster with the specified name, schema, and table configurations.
+   * @param name the table's name
+   * @param schema the table's schema
+   * @param builder a builder containing the table's configurations
+   * @return an object to communicate with the created table
+   * @throws KuduException if anything went wrong
+   */
+  public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
+      throws KuduException {
+    Deferred<KuduTable> d = asyncClient.createTable(name, schema, builder);
+    return joinAndHandleException(d);
+  }
+
+  /**
+   * Delete a table on the cluster with the specified name.
+   * @param name the table's name
+   * @return an rpc response object
+   * @throws KuduException if anything went wrong
+   */
+  public DeleteTableResponse deleteTable(String name) throws KuduException {
+    Deferred<DeleteTableResponse> d = asyncClient.deleteTable(name);
+    return joinAndHandleException(d);
+  }
+
+  /**
+   * Alter a table on the cluster as specified by the builder.
+   *
+   * When the method returns it only indicates that the master accepted the alter
+   * command, use {@link KuduClient#isAlterTableDone(String)} to know when the alter finishes.
+   * @param name the table's name, if this is a table rename then the old table name must be passed
+   * @param ato the alter table builder
+   * @return an rpc response object
+   * @throws KuduException if anything went wrong
+   */
+  public AlterTableResponse alterTable(String name, AlterTableOptions ato) throws KuduException {
+    Deferred<AlterTableResponse> d = asyncClient.alterTable(name, ato);
+    return joinAndHandleException(d);
+  }
+
+  /**
+   * Helper method that checks and waits until the completion of an alter command.
+   * It will block until the alter command is done or the timeout is reached.
+   * @param name Table's name, if the table was renamed then that name must be checked against
+   * @return a boolean indicating if the table is done being altered
+   * @throws KuduException for any error returned by sending RPCs to the master
+   */
+  public boolean isAlterTableDone(String name) throws KuduException {
+    long totalSleepTime = 0;
+    while (totalSleepTime < getDefaultAdminOperationTimeoutMs()) {
+      long start = System.currentTimeMillis();
+
+      try {
+        Deferred<IsAlterTableDoneResponse> d = asyncClient.isAlterTableDone(name);
+        IsAlterTableDoneResponse response;
+
+        response = d.join(AsyncKuduClient.SLEEP_TIME);
+        if (response.isDone()) {
+          return true;
+        }
+
+        // Count time that was slept and see if we need to wait a little more.
+        long elapsed = System.currentTimeMillis() - start;
+        // Don't oversleep the deadline.
+        if (totalSleepTime + AsyncKuduClient.SLEEP_TIME > getDefaultAdminOperationTimeoutMs()) {
+          return false;
+        }
+        // elapsed can be bigger if we slept about 500ms
+        if (elapsed <= AsyncKuduClient.SLEEP_TIME) {
+          LOG.debug("Alter not done, sleep " + (AsyncKuduClient.SLEEP_TIME - elapsed) +
+              " and slept " + totalSleepTime);
+          Thread.sleep(AsyncKuduClient.SLEEP_TIME - elapsed);
+          totalSleepTime += AsyncKuduClient.SLEEP_TIME;
+        } else {
+          totalSleepTime += elapsed;
+        }
+      } catch (Exception ex) {
+        throw KuduException.transformException(ex);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the list of running tablet servers.
+   * @return a list of tablet servers
+   * @throws KuduException if anything went wrong
+   */
+  public ListTabletServersResponse listTabletServers() throws KuduException {
+    Deferred<ListTabletServersResponse> d = asyncClient.listTabletServers();
+    return joinAndHandleException(d);
+  }
+
+  /**
+   * Get the list of all the tables.
+   * @return a list of all the tables
+   * @throws KuduException if anything went wrong
+   */
+  public ListTablesResponse getTablesList() throws KuduException {
+    return getTablesList(null);
+  }
+
+  /**
+   * Get a list of table names. Passing a null filter returns all the tables. When a filter is
+   * specified, it only returns tables that satisfy a substring match.
+   * @param nameFilter an optional table name filter
+   * @return a deferred that contains the list of table names
+   * @throws KuduException if anything went wrong
+   */
+  public ListTablesResponse getTablesList(String nameFilter) throws KuduException {
+    Deferred<ListTablesResponse> d = asyncClient.getTablesList(nameFilter);
+    return joinAndHandleException(d);
+  }
+
+  /**
+   * Test if a table exists.
+   * @param name a non-null table name
+   * @return true if the table exists, else false
+   * @throws KuduException if anything went wrong
+   */
+  public boolean tableExists(String name) throws KuduException {
+    Deferred<Boolean> d = asyncClient.tableExists(name);
+    return joinAndHandleException(d);
+  }
+
+  /**
+   * Open the table with the given name. If the table was just created, this method will block until
+   * all its tablets have also been created.
+   * @param name table to open
+   * @return a KuduTable if the table exists
+   * @throws KuduException if anything went wrong
+   */
+  public KuduTable openTable(final String name) throws KuduException {
+    Deferred<KuduTable> d = asyncClient.openTable(name);
+    return joinAndHandleException(d);
+  }
+
+  /**
+   * Create a new session for interacting with the cluster.
+   * User is responsible for destroying the session object.
+   * This is a fully local operation (no RPCs or blocking).
+   * @return a synchronous wrapper around KuduSession.
+   */
+  public KuduSession newSession() {
+    AsyncKuduSession session = asyncClient.newSession();
+    return new KuduSession(session);
+  }
+
+  /**
+   * Check if statistics collection is enabled for this client.
+   * @return true if it is enabled, else false
+   */
+  public boolean isStatisticsEnabled() {
+    return asyncClient.isStatisticsEnabled();
+  }
+
+  /**
+   * Get the statistics object of this client.
+   *
+   * @return this client's Statistics object
+   * @throws IllegalStateException thrown if statistics collection has been disabled
+   */
+  public Statistics getStatistics() {
+    return asyncClient.getStatistics();
+  }
+
+  /**
+   * Creates a new {@link KuduScanner.KuduScannerBuilder} for a particular table.
+   * @param table the table you intend to scan.
+   * The string is assumed to use the platform's default charset.
+   * @return a new scanner builder for the table
+   */
+  public KuduScanner.KuduScannerBuilder newScannerBuilder(KuduTable table) {
+    return new KuduScanner.KuduScannerBuilder(asyncClient, table);
+  }
+
+  /**
+   * Creates a new {@link KuduScanToken.KuduScanTokenBuilder} for a particular table.
+   * Used for integrations with compute frameworks.
+   * @param table the table you intend to scan
+   * @return a new scan token builder for the table
+   */
+  public KuduScanToken.KuduScanTokenBuilder newScanTokenBuilder(KuduTable table) {
+    return new KuduScanToken.KuduScanTokenBuilder(asyncClient, table);
+  }
+
+  /**
+   * Analogous to {@link #shutdown()}.
+   * @throws KuduException if an error happens while closing the connections
+   */
+  @Override
+  public void close() throws KuduException {
+    try {
+      asyncClient.close();
+    } catch (Exception e) {
+      KuduException.transformException(e);
+    }
+  }
+
+  /**
+   * Performs a graceful shutdown of this instance.
+   * @throws KuduException if anything went wrong
+   */
+  public void shutdown() throws KuduException {
+    Deferred<ArrayList<Void>> d = asyncClient.shutdown();
+    joinAndHandleException(d);
+  }
+
+  /**
+   * Get the timeout used for operations on sessions and scanners.
+   * @return a timeout in milliseconds
+   */
+  public long getDefaultOperationTimeoutMs() {
+    return asyncClient.getDefaultOperationTimeoutMs();
+  }
+
+  /**
+   * Get the timeout used for admin operations.
+   * @return a timeout in milliseconds
+   */
+  public long getDefaultAdminOperationTimeoutMs() {
+    return asyncClient.getDefaultAdminOperationTimeoutMs();
+  }
+
+  // Helper method to handle joining and transforming the Exception we receive.
+  private <R> R joinAndHandleException(Deferred<R> deferred) throws KuduException {
+    try {
+      return deferred.join(getDefaultAdminOperationTimeoutMs());
+    } catch (Exception e) {
+      throw KuduException.transformException(e);
+    }
+  }
+
+  /**
+   * Builder class to use in order to connect to Kudu.
+   * All the parameters beyond those in the constructors are optional.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public final static class KuduClientBuilder {
+    private AsyncKuduClient.AsyncKuduClientBuilder clientBuilder;
+
+    /**
+     * Creates a new builder for a client that will connect to the specified masters.
+     * @param masterAddresses comma-separated list of "host:port" pairs of the masters
+     */
+    public KuduClientBuilder(String masterAddresses) {
+      clientBuilder = new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses);
+    }
+
+    /**
+     * Creates a new builder for a client that will connect to the specified masters.
+     *
+     * <p>Here are some examples of recognized formats:
+     * <ul>
+     *   <li>example.com
+     *   <li>example.com:80
+     *   <li>192.0.2.1
+     *   <li>192.0.2.1:80
+     *   <li>[2001:db8::1]
+     *   <li>[2001:db8::1]:80
+     *   <li>2001:db8::1
+     * </ul>
+     *
+     * @param masterAddresses list of master addresses
+     */
+    public KuduClientBuilder(List<String> masterAddresses) {
+      clientBuilder = new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses);
+    }
+
+    /**
+     * Sets the default timeout used for administrative operations (e.g. createTable, deleteTable,
+     * etc).
+     * Optional.
+     * If not provided, defaults to 30s.
+     * A value of 0 disables the timeout.
+     * @param timeoutMs a timeout in milliseconds
+     * @return this builder
+     */
+    public KuduClientBuilder defaultAdminOperationTimeoutMs(long timeoutMs) {
+      clientBuilder.defaultAdminOperationTimeoutMs(timeoutMs);
+      return this;
+    }
+
+    /**
+     * Sets the default timeout used for user operations (using sessions and scanners).
+     * Optional.
+     * If not provided, defaults to 30s.
+     * A value of 0 disables the timeout.
+     * @param timeoutMs a timeout in milliseconds
+     * @return this builder
+     */
+    public KuduClientBuilder defaultOperationTimeoutMs(long timeoutMs) {
+      clientBuilder.defaultOperationTimeoutMs(timeoutMs);
+      return this;
+    }
+
+    /**
+     * Sets the default timeout to use when waiting on data from a socket.
+     * Optional.
+     * If not provided, defaults to 10s.
+     * A value of 0 disables the timeout.
+     * @param timeoutMs a timeout in milliseconds
+     * @return this builder
+     */
+    public KuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
+      clientBuilder.defaultSocketReadTimeoutMs(timeoutMs);
+      return this;
+    }
+
+    /**
+     * Disable this client's collection of statistics.
+     * Statistics are enabled by default.
+     * @return this builder
+     */
+    public KuduClientBuilder disableStatistics() {
+      clientBuilder.disableStatistics();
+      return this;
+    }
+
+    /**
+     * Set the executors which will be used for the embedded Netty boss and workers.
+     * Optional.
+     * If not provided, uses a simple cached threadpool. If either argument is null,
+     * then such a thread pool will be used in place of that argument.
+     * Note: executor's max thread number must be greater or equal to corresponding
+     * worker count, or netty cannot start enough threads, and client will get stuck.
+     * If not sure, please just use CachedThreadPool.
+     */
+    public KuduClientBuilder nioExecutors(Executor bossExecutor, Executor workerExecutor) {
+      clientBuilder.nioExecutors(bossExecutor, workerExecutor);
+      return this;
+    }
+
+    /**
+     * Set the maximum number of boss threads.
+     * Optional.
+     * If not provided, 1 is used.
+     */
+    public KuduClientBuilder bossCount(int bossCount) {
+      clientBuilder.bossCount(bossCount);
+      return this;
+    }
+
+    /**
+     * Set the maximum number of worker threads.
+     * Optional.
+     * If not provided, (2 * the number of available processors) is used.
+     */
+    public KuduClientBuilder workerCount(int workerCount) {
+      clientBuilder.workerCount(workerCount);
+      return this;
+    }
+
+    /**
+     * Creates a new client that connects to the masters.
+     * Doesn't block and won't throw an exception if the masters don't exist.
+     * @return a new asynchronous Kudu client
+     */
+    public KuduClient build() {
+      AsyncKuduClient client = clientBuilder.build();
+      return new KuduClient(client);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
new file mode 100644
index 0000000..4bd2eaf
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *   - Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   - Redistributions in binary form must reproduce the above copyright notice,
+ *     this list of conditions and the following disclaimer in the documentation
+ *     and/or other materials provided with the distribution.
+ *   - Neither the name of the StumbleUpon nor the names of its contributors
+ *     may be used to endorse or promote products derived from this software
+ *     without specific prior written permission.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.kududb.client;
+
+import com.stumbleupon.async.DeferredGroupException;
+import com.stumbleupon.async.TimeoutException;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+import java.io.IOException;
+
+/**
+ * The parent class of all exceptions sent by the Kudu client. This is the only exception you will
+ * see if you're using the non-async API, such as {@link KuduSession} instead of
+ * {@link AsyncKuduSession}.
+ *
+ * Each instance of this class has a {@link Status} which gives more information about the error.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@SuppressWarnings("serial")
+public abstract class KuduException extends IOException {
+
+  private final Status status;
+
+  /**
+   * Constructor.
+   * @param status object containing the reason for the exception
+   * trace.
+   */
+  KuduException(Status status) {
+    super(status.getMessage());
+    this.status = status;
+  }
+
+  /**
+   * Constructor.
+   * @param status object containing the reason for the exception
+   * @param cause The exception that caused this one to be thrown.
+   */
+  KuduException(Status status, Throwable cause) {
+    super(status.getMessage(), cause);
+    this.status = status;
+  }
+
+  /**
+   * Get the Status object for this exception.
+   * @return a status object indicating the reason for the exception
+   */
+  public Status getStatus() {
+    return status;
+  }
+
+  /**
+   * Inspects the given exception and transforms it into a KuduException.
+   * @param e generic exception we want to transform
+   * @return a KuduException that's easier to handle
+   */
+  static KuduException transformException(Exception e) {
+    if (e instanceof KuduException) {
+      return (KuduException) e;
+    } else if (e instanceof DeferredGroupException) {
+      // TODO anything we can do to improve on that kind of exception?
+    } else if (e instanceof TimeoutException) {
+      Status statusTimeout = Status.TimedOut(e.getMessage());
+      return new NonRecoverableException(statusTimeout, e);
+    } else if (e instanceof InterruptedException) {
+      // Need to reset the interrupt flag since we caught it but aren't handling it.
+      Thread.currentThread().interrupt();
+      Status statusAborted = Status.Aborted(e.getMessage());
+      return new NonRecoverableException(statusAborted, e);
+    }
+    Status status = Status.IOError(e.getMessage());
+    return new NonRecoverableException(status, e);
+  }
+}


Mime
View raw message