hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1459013 [2/8] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-client/src/test/java/org...
Date Wed, 20 Mar 2013 19:36:47 GMT
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hbase.IpcProtocol;
+
+
+import com.google.protobuf.Message;
+
+/**
+ * Save on relection by keeping around method, method argument, and constructor instances
+ */
+class ReflectionCache {
+  private final Map<String, Message> methodArgCache = new ConcurrentHashMap<String, Message>();
+  private final Map<String, Method> methodInstanceCache = new ConcurrentHashMap<String, Method>();
+
+  public ReflectionCache() {
+    super();
+  }
+
+  Method getMethod(Class<? extends IpcProtocol> protocol, String methodName) {
+    Method method = this.methodInstanceCache.get(methodName);
+    if (method != null) return method;
+    Method [] methods = protocol.getMethods();
+    for (Method m : methods) {
+      if (m.getName().equals(methodName)) {
+        m.setAccessible(true);
+        this.methodInstanceCache.put(methodName, m);
+        return m;
+      }
+    }
+    return null;
+  }
+
+  Message getMethodArgType(Method method) throws Exception {
+    Message protoType = this.methodArgCache.get(method.getName());
+    if (protoType != null) return protoType;
+    Class<?>[] args = method.getParameterTypes();
+    Class<?> arg;
+    if (args.length == 2) {
+      // RpcController + Message in the method args
+      // (generated code from RPC bits in .proto files have RpcController)
+      arg = args[1];
+    } else if (args.length == 1) {
+      arg = args[0];
+    } else {
+      //unexpected
+      return null;
+    }
+    //in the protobuf methods, args[1] is the only significant argument
+    Method newInstMethod = arg.getMethod("getDefaultInstance");
+    newInstMethod.setAccessible(true);
+    protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
+    this.methodArgCache.put(method.getName(), protoType);
+    return protoType;
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * An {@link RemoteException} with some extra information.  If source exception
+ * was a {@link DoNotRetryIOException}, {@link #isDoNotRetry()} will return true.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Private
+public class RemoteWithExtrasException extends RemoteException {
+  private final String hostname;
+  private final int port;
+  private final boolean doNotRetry;
+
+  public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry) {
+    this(className, msg, null, -1, doNotRetry);
+  }
+
+  public RemoteWithExtrasException(String className, String msg, final String hostname,
+      final int port, final boolean doNotRetry) {
+    super(className, msg);
+    this.hostname = hostname;
+    this.port = port;
+    this.doNotRetry = doNotRetry;
+  }
+
+  /**
+   * @return null if not set
+   */
+  public String getHostname() {
+    return this.hostname;
+  }
+
+  /**
+   * @return -1 if not set
+   */
+  public int getPort() {
+    return this.port;
+  }
+
+  /**
+   * @return True if origin exception was a do not retry type.
+   */
+  public boolean isDoNotRetry() {
+    return this.doNotRetry;
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Mar 20 19:36:46 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
+
 import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
 
 import java.io.ByteArrayOutputStream;
@@ -34,7 +35,21 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
+
+
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -42,20 +57,15 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.MasterAdminProtocol;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Action;
 import org.apache.hadoop.hbase.client.AdminProtocol;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.ClientProtocol;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.MultiAction;
-import org.apache.hadoop.hbase.client.MultiResponse;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -88,12 +98,11 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.DeleteType;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.DeleteType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
@@ -114,16 +123,6 @@ import org.apache.hadoop.hbase.util.Pair
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 /**
  * Protobufs utility.
  */
@@ -342,43 +341,75 @@ public final class ProtobufUtil {
   }
 
   /**
-   * Convert a protocol buffer Mutate to a Put
+   * Convert a protocol buffer Mutate to a Put.
    *
-   * @param proto the protocol buffer Mutate to convert
-   * @return the converted client Put
-   * @throws DoNotRetryIOException
+   * @param proto The protocol buffer MutationProto to convert
+   * @return A client Put.
+   * @throws IOException
    */
-  public static Put toPut(
-      final Mutate proto) throws DoNotRetryIOException {
-    MutateType type = proto.getMutateType();
-    assert type == MutateType.PUT : type.name();
-    byte[] row = proto.getRow().toByteArray();
-    long timestamp = HConstants.LATEST_TIMESTAMP;
-    if (proto.hasTimestamp()) {
-      timestamp = proto.getTimestamp();
-    }
-    Put put = new Put(row, timestamp);
-    put.setWriteToWAL(proto.getWriteToWAL());
-    for (NameBytesPair attribute: proto.getAttributeList()) {
-      put.setAttribute(attribute.getName(),
-        attribute.getValue().toByteArray());
-    }
-    for (ColumnValue column: proto.getColumnValueList()) {
-      byte[] family = column.getFamily().toByteArray();
-      for (QualifierValue qv: column.getQualifierValueList()) {
-        byte[] qualifier = qv.getQualifier().toByteArray();
-        if (!qv.hasValue()) {
-          throw new DoNotRetryIOException(
-            "Missing required field: qualifer value");
+  public static Put toPut(final MutationProto proto)
+  throws IOException {
+    return toPut(proto, null);
+  }
+
+  /**
+   * Convert a protocol buffer Mutate to a Put.
+   *
+   * @param proto The protocol buffer MutationProto to convert
+   * @param cellScanner If non-null, the Cell data that goes with this proto.
+   * @return A client Put.
+   * @throws IOException
+   */
+  public static Put toPut(final MutationProto proto, final CellScanner cellScanner)
+  throws IOException {
+    // TODO: Server-side at least why do we convert back to the Client types?  Why not just pb it?
+    MutationType type = proto.getMutateType();
+    assert type == MutationType.PUT: type.name();
+    byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
+    long timestamp = proto.hasTimestamp()? proto.getTimestamp(): HConstants.LATEST_TIMESTAMP;
+    Put put = null;
+    int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
+    if (cellCount > 0) {
+      // The proto has metadata only and the data is separate to be found in the cellScanner.
+      if (cellScanner == null) {
+        throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
+          TextFormat.shortDebugString(proto));
+      }
+      for (int i = 0; i < cellCount; i++) {
+        if (!cellScanner.advance()) {
+          throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
+            " no cell returned: " + TextFormat.shortDebugString(proto));
+        }
+        Cell cell = cellScanner.current();
+        if (put == null) {
+          put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), timestamp);
         }
-        byte[] value = qv.getValue().toByteArray();
-        long ts = timestamp;
-        if (qv.hasTimestamp()) {
-          ts = qv.getTimestamp();
+        put.add(KeyValueUtil.ensureKeyValue(cell));
+      }
+    } else {
+      put = new Put(row, timestamp);
+      // The proto has the metadata and the data itself
+      for (ColumnValue column: proto.getColumnValueList()) {
+        byte[] family = column.getFamily().toByteArray();
+        for (QualifierValue qv: column.getQualifierValueList()) {
+          byte[] qualifier = qv.getQualifier().toByteArray();
+          if (!qv.hasValue()) {
+            throw new DoNotRetryIOException(
+                "Missing required field: qualifer value");
+          }
+          byte[] value = qv.getValue().toByteArray();
+          long ts = timestamp;
+          if (qv.hasTimestamp()) {
+            ts = qv.getTimestamp();
+          }
+          put.add(family, qualifier, ts, value);
         }
-        put.add(family, qualifier, ts, value);
       }
     }
+    put.setWriteToWAL(proto.getWriteToWAL());
+    for (NameBytesPair attribute: proto.getAttributeList()) {
+      put.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
+    }
     return put;
   }
 
@@ -387,74 +418,130 @@ public final class ProtobufUtil {
    *
    * @param proto the protocol buffer Mutate to convert
    * @return the converted client Delete
+   * @throws IOException
    */
-  public static Delete toDelete(final Mutate proto) {
-    MutateType type = proto.getMutateType();
-    assert type == MutateType.DELETE : type.name();
-    byte[] row = proto.getRow().toByteArray();
+  public static Delete toDelete(final MutationProto proto)
+  throws IOException {
+    return toDelete(proto, null);
+  }
+
+  /**
+   * Convert a protocol buffer Mutate to a Delete
+   *
+   * @param proto the protocol buffer Mutate to convert
+   * @param cellScanner if non-null, the data that goes with this delete.
+   * @return the converted client Delete
+   * @throws IOException
+   */
+  public static Delete toDelete(final MutationProto proto, final CellScanner cellScanner)
+  throws IOException {
+    MutationType type = proto.getMutateType();
+    assert type == MutationType.DELETE : type.name();
+    byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
     long timestamp = HConstants.LATEST_TIMESTAMP;
     if (proto.hasTimestamp()) {
       timestamp = proto.getTimestamp();
     }
-    Delete delete = new Delete(row, timestamp);
-    delete.setWriteToWAL(proto.getWriteToWAL());
-    for (NameBytesPair attribute: proto.getAttributeList()) {
-      delete.setAttribute(attribute.getName(),
-        attribute.getValue().toByteArray());
-    }
-    for (ColumnValue column: proto.getColumnValueList()) {
-      byte[] family = column.getFamily().toByteArray();
-      for (QualifierValue qv: column.getQualifierValueList()) {
-        DeleteType deleteType = qv.getDeleteType();
-        byte[] qualifier = null;
-        if (qv.hasQualifier()) {
-          qualifier = qv.getQualifier().toByteArray();
+    Delete delete = null;
+    int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
+    if (cellCount > 0) {
+      // The proto has metadata only and the data is separate to be found in the cellScanner.
+      if (cellScanner == null) {
+        throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
+          TextFormat.shortDebugString(proto));
+      }
+      for (int i = 0; i < cellCount; i++) {
+        if (!cellScanner.advance()) {
+          throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
+            " no cell returned: " + TextFormat.shortDebugString(proto));
         }
-        long ts = HConstants.LATEST_TIMESTAMP;
-        if (qv.hasTimestamp()) {
-          ts = qv.getTimestamp();
+        Cell cell = cellScanner.current();
+        if (delete == null) {
+          delete =
+            new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), timestamp);
         }
-        if (deleteType == DeleteType.DELETE_ONE_VERSION) {
-          delete.deleteColumn(family, qualifier, ts);
-        } else if (deleteType == DeleteType.DELETE_MULTIPLE_VERSIONS) {
-          delete.deleteColumns(family, qualifier, ts);
-        } else {
-          delete.deleteFamily(family, ts);
+        delete.addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
+      }
+    } else {
+      delete = new Delete(row, timestamp);
+      for (ColumnValue column: proto.getColumnValueList()) {
+        byte[] family = column.getFamily().toByteArray();
+        for (QualifierValue qv: column.getQualifierValueList()) {
+          DeleteType deleteType = qv.getDeleteType();
+          byte[] qualifier = null;
+          if (qv.hasQualifier()) {
+            qualifier = qv.getQualifier().toByteArray();
+          }
+          long ts = HConstants.LATEST_TIMESTAMP;
+          if (qv.hasTimestamp()) {
+            ts = qv.getTimestamp();
+          }
+          if (deleteType == DeleteType.DELETE_ONE_VERSION) {
+            delete.deleteColumn(family, qualifier, ts);
+          } else if (deleteType == DeleteType.DELETE_MULTIPLE_VERSIONS) {
+            delete.deleteColumns(family, qualifier, ts);
+          } else {
+            delete.deleteFamily(family, ts);
+          }
         }
       }
     }
+    delete.setWriteToWAL(proto.getWriteToWAL());
+    for (NameBytesPair attribute: proto.getAttributeList()) {
+      delete.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
+    }
     return delete;
   }
 
   /**
    * Convert a protocol buffer Mutate to an Append
-   *
+   * @param cellScanner
    * @param proto the protocol buffer Mutate to convert
    * @return the converted client Append
    * @throws DoNotRetryIOException
    */
-  public static Append toAppend(
-      final Mutate proto) throws DoNotRetryIOException {
-    MutateType type = proto.getMutateType();
-    assert type == MutateType.APPEND : type.name();
-    byte[] row = proto.getRow().toByteArray();
-    Append append = new Append(row);
-    append.setWriteToWAL(proto.getWriteToWAL());
-    for (NameBytesPair attribute: proto.getAttributeList()) {
-      append.setAttribute(attribute.getName(),
-        attribute.getValue().toByteArray());
-    }
-    for (ColumnValue column: proto.getColumnValueList()) {
-      byte[] family = column.getFamily().toByteArray();
-      for (QualifierValue qv: column.getQualifierValueList()) {
-        byte[] qualifier = qv.getQualifier().toByteArray();
-        if (!qv.hasValue()) {
-          throw new DoNotRetryIOException(
-            "Missing required field: qualifer value");
+  public static Append toAppend(final MutationProto proto, final CellScanner cellScanner)
+  throws DoNotRetryIOException {
+    MutationType type = proto.getMutateType();
+    assert type == MutationType.APPEND : type.name();
+    byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
+    Append append = null;
+    int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
+    if (cellCount > 0) {
+      // The proto has metadata only and the data is separate to be found in the cellScanner.
+      if (cellScanner == null) {
+        throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
+          TextFormat.shortDebugString(proto));
+      }
+      for (int i = 0; i < cellCount; i++) {
+        if (!cellScanner.advance()) {
+          throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
+            " no cell returned: " + TextFormat.shortDebugString(proto));
+        }
+        Cell cell = cellScanner.current();
+        if (append == null) {
+          append = new Append(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
         }
-        byte[] value = qv.getValue().toByteArray();
-        append.add(family, qualifier, value);
+        append.add(KeyValueUtil.ensureKeyValue(cell));
       }
+    } else {
+      append = new Append(row);
+      for (ColumnValue column: proto.getColumnValueList()) {
+        byte[] family = column.getFamily().toByteArray();
+        for (QualifierValue qv: column.getQualifierValueList()) {
+          byte[] qualifier = qv.getQualifier().toByteArray();
+          if (!qv.hasValue()) {
+            throw new DoNotRetryIOException(
+              "Missing required field: qualifer value");
+          }
+          byte[] value = qv.getValue().toByteArray();
+          append.add(family, qualifier, value);
+        }
+      }
+    }
+    append.setWriteToWAL(proto.getWriteToWAL());
+    for (NameBytesPair attribute: proto.getAttributeList()) {
+      append.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
     }
     return append;
   }
@@ -466,18 +553,18 @@ public final class ProtobufUtil {
    * @return the converted Mutation
    * @throws IOException
    */
-  public static Mutation toMutation(final Mutate proto) throws IOException {
-    MutateType type = proto.getMutateType();
-    if (type == MutateType.APPEND) {
-      return toAppend(proto);
+  public static Mutation toMutation(final MutationProto proto) throws IOException {
+    MutationType type = proto.getMutateType();
+    if (type == MutationType.APPEND) {
+      return toAppend(proto, null);
     }
-    if (type == MutateType.DELETE) {
-      return toDelete(proto);
+    if (type == MutationType.DELETE) {
+      return toDelete(proto, null);
     }
-    if (type == MutateType.PUT) {
-      return toPut(proto);
+    if (type == MutationType.PUT) {
+      return toPut(proto, null);
     }
-    throw new IOException("Not an understood mutate type " + type);
+    throw new IOException("Unknown mutation type " + type);
   }
 
   /**
@@ -487,13 +574,44 @@ public final class ProtobufUtil {
    * @return the converted client Increment
    * @throws IOException
    */
-  public static Increment toIncrement(
-      final Mutate proto) throws IOException {
-    MutateType type = proto.getMutateType();
-    assert type == MutateType.INCREMENT : type.name();
-    byte[] row = proto.getRow().toByteArray();
-    Increment increment = new Increment(row);
-    increment.setWriteToWAL(proto.getWriteToWAL());
+  public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner)
+  throws IOException {
+    MutationType type = proto.getMutateType();
+    assert type == MutationType.INCREMENT : type.name();
+    byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
+    Increment increment = null;
+    int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
+    if (cellCount > 0) {
+      // The proto has metadata only and the data is separate to be found in the cellScanner.
+      if (cellScanner == null) {
+        throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
+          TextFormat.shortDebugString(proto));
+      }
+      for (int i = 0; i < cellCount; i++) {
+        if (!cellScanner.advance()) {
+          throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
+            " no cell returned: " + TextFormat.shortDebugString(proto));
+        }
+        Cell cell = cellScanner.current();
+        if (increment == null) {
+          increment = new Increment(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+        }
+        increment.add(KeyValueUtil.ensureKeyValue(cell));
+      }
+    } else {
+      increment = new Increment(row);
+      for (ColumnValue column: proto.getColumnValueList()) {
+        byte[] family = column.getFamily().toByteArray();
+        for (QualifierValue qv: column.getQualifierValueList()) {
+          byte[] qualifier = qv.getQualifier().toByteArray();
+          if (!qv.hasValue()) {
+            throw new DoNotRetryIOException("Missing required field: qualifer value");
+          }
+          long value = Bytes.toLong(qv.getValue().toByteArray());
+          increment.addColumn(family, qualifier, value);
+        }
+      }
+    }
     if (proto.hasTimeRange()) {
       HBaseProtos.TimeRange timeRange = proto.getTimeRange();
       long minStamp = 0;
@@ -506,18 +624,7 @@ public final class ProtobufUtil {
       }
       increment.setTimeRange(minStamp, maxStamp);
     }
-    for (ColumnValue column: proto.getColumnValueList()) {
-      byte[] family = column.getFamily().toByteArray();
-      for (QualifierValue qv: column.getQualifierValueList()) {
-        byte[] qualifier = qv.getQualifier().toByteArray();
-        if (!qv.hasValue()) {
-          throw new DoNotRetryIOException(
-            "Missing required field: qualifer value");
-        }
-        long value = Bytes.toLong(qv.getValue().toByteArray());
-        increment.addColumn(family, qualifier, value);
-      }
-    }
+    increment.setWriteToWAL(proto.getWriteToWAL());
     return increment;
   }
 
@@ -733,10 +840,10 @@ public final class ProtobufUtil {
    * @param increment
    * @return the converted mutate
    */
-  public static Mutate toMutate(final Increment increment) {
-    Mutate.Builder builder = Mutate.newBuilder();
+  public static MutationProto toMutation(final Increment increment) {
+    MutationProto.Builder builder = MutationProto.newBuilder();
     builder.setRow(ByteString.copyFrom(increment.getRow()));
-    builder.setMutateType(MutateType.INCREMENT);
+    builder.setMutateType(MutationType.INCREMENT);
     builder.setWriteToWAL(increment.getWriteToWAL());
     TimeRange timeRange = increment.getTimeRange();
     if (!timeRange.isAllTime()) {
@@ -768,27 +875,14 @@ public final class ProtobufUtil {
   /**
    * Create a protocol buffer Mutate based on a client Mutation
    *
-   * @param mutateType
+   * @param type
    * @param mutation
-   * @return a mutate
+   * @return a protobuf'd Mutation
    * @throws IOException
    */
-  public static Mutate toMutate(final MutateType mutateType,
-      final Mutation mutation) throws IOException {
-    Mutate.Builder mutateBuilder = Mutate.newBuilder();
-    mutateBuilder.setRow(ByteString.copyFrom(mutation.getRow()));
-    mutateBuilder.setMutateType(mutateType);
-    mutateBuilder.setWriteToWAL(mutation.getWriteToWAL());
-    mutateBuilder.setTimestamp(mutation.getTimeStamp());
-    Map<String, byte[]> attributes = mutation.getAttributesMap();
-    if (!attributes.isEmpty()) {
-      NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
-      for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
-        attributeBuilder.setName(attribute.getKey());
-        attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
-        mutateBuilder.addAttribute(attributeBuilder.build());
-      }
-    }
+  public static MutationProto toMutation(final MutationType type, final Mutation mutation)
+  throws IOException {
+    MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
     ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
     QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
     for (Map.Entry<byte[],List<? extends Cell>> family: mutation.getFamilyMap().entrySet()) {
@@ -799,15 +893,56 @@ public final class ProtobufUtil {
         valueBuilder.setQualifier(ByteString.copyFrom(kv.getQualifier()));
         valueBuilder.setValue(ByteString.copyFrom(kv.getValue()));
         valueBuilder.setTimestamp(kv.getTimestamp());
-        if (mutateType == MutateType.DELETE) {
+        if (type == MutationType.DELETE) {
           KeyValue.Type keyValueType = KeyValue.Type.codeToType(kv.getType());
           valueBuilder.setDeleteType(toDeleteType(keyValueType));
         }
         columnBuilder.addQualifierValue(valueBuilder.build());
       }
-      mutateBuilder.addColumnValue(columnBuilder.build());
+      builder.addColumnValue(columnBuilder.build());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MutationProto based on a client Mutation.  Does NOT include data.
+   * Understanding is that the Cell will be transported other than via protobuf.
+   * @param type
+   * @param mutation
+   * @return a protobuf'd Mutation
+   * @throws IOException
+   */
+  public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation)
+  throws IOException {
+    MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
+    builder.setAssociatedCellCount(mutation.size());
+    return builder.build();
+  }
+
+  /**
+   * Code shared by {@link #toMutation(MutationType, Mutation)} and
+   * {@link #toMutationNoData(MutationType, Mutation)}
+   * @param type
+   * @param mutation
+   * @return A partly-filled out protobuf'd Mutation.
+   */
+  private static MutationProto.Builder getMutationBuilderAndSetCommonFields(final MutationType type,
+      final Mutation mutation) {
+    MutationProto.Builder builder = MutationProto.newBuilder();
+    builder.setRow(ByteString.copyFrom(mutation.getRow()));
+    builder.setMutateType(type);
+    builder.setWriteToWAL(mutation.getWriteToWAL());
+    builder.setTimestamp(mutation.getTimeStamp());
+    Map<String, byte[]> attributes = mutation.getAttributesMap();
+    if (!attributes.isEmpty()) {
+      NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
+      for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
+        attributeBuilder.setName(attribute.getKey());
+        attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
+        builder.addAttribute(attributeBuilder.build());
+      }
     }
-    return mutateBuilder.build();
+    return builder;
   }
 
   /**
@@ -821,25 +956,66 @@ public final class ProtobufUtil {
     Cell [] cells = result.raw();
     if (cells != null) {
       for (Cell c : cells) {
-        builder.addKeyValue(toKeyValue(c));
+        builder.addCell(toCell(c));
       }
     }
     return builder.build();
   }
 
   /**
+   * Convert a client Result to a protocol buffer Result.
+   * The pb Result does not include the Cell data.  That is for transport otherwise.
+   *
+   * @param result the client Result to convert
+   * @return the converted protocol buffer Result
+   */
+  public static ClientProtos.Result toResultNoData(final Result result) {
+    ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
+    builder.setAssociatedCellCount(result.size());
+    return builder.build();
+  }
+
+  /**
    * Convert a protocol buffer Result to a client Result
    *
    * @param proto the protocol buffer Result to convert
    * @return the converted client Result
    */
   public static Result toResult(final ClientProtos.Result proto) {
-    List<HBaseProtos.KeyValue> values = proto.getKeyValueList();
-    List<KeyValue> keyValues = new ArrayList<KeyValue>(values.size());
-    for (HBaseProtos.KeyValue kv: values) {
-      keyValues.add(toKeyValue(kv));
+    List<HBaseProtos.Cell> values = proto.getCellList();
+    List<Cell> cells = new ArrayList<Cell>(values.size());
+    for (HBaseProtos.Cell c: values) {
+      cells.add(toCell(c));
+    }
+    return new Result(cells);
+  }
+
+  /**
+   * Convert a protocol buffer Result to a client Result
+   *
+   * @param proto the protocol buffer Result to convert
+   * @param scanner Optional cell scanner.
+   * @return the converted client Result
+   * @throws IOException
+   */
+  public static Result toResult(final ClientProtos.Result proto, final CellScanner scanner)
+  throws IOException {
+    // TODO: Unit test that has some Cells in scanner and some in the proto.
+    List<Cell> cells = null;
+    if (proto.hasAssociatedCellCount()) {
+      int count = proto.getAssociatedCellCount();
+      cells = new ArrayList<Cell>(count);
+      for (int i = 0; i < count; i++) {
+        if (!scanner.advance()) throw new IOException("Failed get " + i + " of " + count);
+        cells.add(scanner.current());
+      }
+    }
+    List<HBaseProtos.Cell> values = proto.getCellList();
+    if (cells == null) cells = new ArrayList<Cell>(values.size());
+    for (HBaseProtos.Cell c: values) {
+      cells.add(toCell(c));
     }
-    return new Result(keyValues);
+    return new Result(cells);
   }
 
   /**
@@ -1012,55 +1188,6 @@ public final class ProtobufUtil {
   }
 
   /**
-   * A helper to invoke a multi action using client protocol.
-   *
-   * @param client
-   * @param multi
-   * @return a multi response
-   * @throws IOException
-   */
-  public static <R> MultiResponse multi(final ClientProtocol client,
-      final MultiAction<R> multi) throws IOException {
-    MultiResponse response = new MultiResponse();
-    for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
-      byte[] regionName = e.getKey();
-      int rowMutations = 0;
-      List<Action<R>> actions = e.getValue();
-      for (Action<R> action: actions) {
-        Row row = action.getAction();
-        if (row instanceof RowMutations) {
-          try {
-            MultiRequest request =
-                RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
-            client.multi(null, request);
-            response.add(regionName, action.getOriginalIndex(), new Result());
-          } catch (ServiceException se) {
-            response.add(regionName, action.getOriginalIndex(), getRemoteException(se));
-          }
-          rowMutations++;
-        }
-      }
-      if (actions.size() > rowMutations) {
-        Exception ex = null;
-        List<Object> results = null;
-        try {
-          MultiRequest request =
-              RequestConverter.buildMultiRequest(regionName, actions);
-          ClientProtos.MultiResponse proto = client.multi(null, request);
-          results = ResponseConverter.getResults(proto);
-        } catch (ServiceException se) {
-          ex = getRemoteException(se);
-        }
-        for (int i = 0, n = actions.size(); i < n; i++) {
-          int originalIndex = actions.get(i).getOriginalIndex();
-          response.add(regionName, originalIndex, results == null ? ex : results.get(i));
-        }
-      }
-    }
-    return response;
-  }
-
-  /**
    * A helper to bulk load a list of HFiles using client protocol.
    *
    * @param client
@@ -1731,33 +1858,31 @@ public final class ProtobufUtil {
     throw new IOException(se);
   }
 
-  public static HBaseProtos.KeyValue toKeyValue(final Cell kv) {
+  public static HBaseProtos.Cell toCell(final Cell kv) {
     // Doing this is going to kill us if we do it for all data passed.
     // St.Ack 20121205
-    // TODO: Do a Cell version
-    HBaseProtos.KeyValue.Builder kvbuilder = HBaseProtos.KeyValue.newBuilder();
+    HBaseProtos.Cell.Builder kvbuilder = HBaseProtos.Cell.newBuilder();
     kvbuilder.setRow(ByteString.copyFrom(kv.getRowArray(), kv.getRowOffset(),
       kv.getRowLength()));
     kvbuilder.setFamily(ByteString.copyFrom(kv.getFamilyArray(),
       kv.getFamilyOffset(), kv.getFamilyLength()));
     kvbuilder.setQualifier(ByteString.copyFrom(kv.getQualifierArray(),
       kv.getQualifierOffset(), kv.getQualifierLength()));
-    kvbuilder.setKeyType(HBaseProtos.KeyType.valueOf(kv.getTypeByte()));
+    kvbuilder.setCellType(HBaseProtos.CellType.valueOf(kv.getTypeByte()));
     kvbuilder.setTimestamp(kv.getTimestamp());
     kvbuilder.setValue(ByteString.copyFrom(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
     return kvbuilder.build();
   }
 
-  public static KeyValue toKeyValue(final HBaseProtos.KeyValue kv) {
+  public static Cell toCell(final HBaseProtos.Cell cell) {
     // Doing this is going to kill us if we do it for all data passed.
     // St.Ack 20121205
-    // TODO: Do a Cell version
-    return new KeyValue(kv.getRow().toByteArray(),
-      kv.getFamily().toByteArray(),
-      kv.getQualifier().toByteArray(),
-      kv.getTimestamp(),
-      KeyValue.Type.codeToType((byte)kv.getKeyType().getNumber()),
-      kv.getValue().toByteArray());
+    return CellUtil.createCell(cell.getRow().toByteArray(),
+      cell.getFamily().toByteArray(),
+      cell.getQualifier().toByteArray(),
+      cell.getTimestamp(),
+      (byte)cell.getCellType().getNumber(),
+      cell.getValue().toByteArray());
   }
 
   /**

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Wed Mar 20 19:36:46 2013
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
-import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -58,11 +61,11 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -92,8 +95,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
-import java.io.IOException;
-import java.util.List;
+import com.google.protobuf.ByteString;
 
 /**
  * Helper utility to build protocol buffer requests,
@@ -206,9 +208,9 @@ public final class RequestConverter {
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
 
-    Mutate.Builder mutateBuilder = Mutate.newBuilder();
+    MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
     mutateBuilder.setRow(ByteString.copyFrom(row));
-    mutateBuilder.setMutateType(MutateType.INCREMENT);
+    mutateBuilder.setMutateType(MutationType.INCREMENT);
     mutateBuilder.setWriteToWAL(writeToWAL);
     ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
     columnBuilder.setFamily(ByteString.copyFrom(family));
@@ -217,8 +219,7 @@ public final class RequestConverter {
     valueBuilder.setQualifier(ByteString.copyFrom(qualifier));
     columnBuilder.addQualifierValue(valueBuilder.build());
     mutateBuilder.addColumnValue(columnBuilder.build());
-
-    builder.setMutate(mutateBuilder.build());
+    builder.setMutation(mutateBuilder.build());
     return builder.build();
   }
 
@@ -245,7 +246,7 @@ public final class RequestConverter {
     builder.setRegion(region);
     Condition condition = buildCondition(
       row, family, qualifier, comparator, compareType);
-    builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
     builder.setCondition(condition);
     return builder.build();
   }
@@ -273,7 +274,7 @@ public final class RequestConverter {
     builder.setRegion(region);
     Condition condition = buildCondition(
       row, family, qualifier, comparator, compareType);
-    builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
     builder.setCondition(condition);
     return builder.build();
   }
@@ -292,7 +293,7 @@ public final class RequestConverter {
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
     return builder.build();
   }
 
@@ -310,7 +311,7 @@ public final class RequestConverter {
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    builder.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, append));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append));
     return builder.build();
   }
 
@@ -327,7 +328,7 @@ public final class RequestConverter {
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    builder.setMutate(ProtobufUtil.toMutate(increment));
+    builder.setMutation(ProtobufUtil.toMutation(increment));
     return builder.build();
   }
 
@@ -345,7 +346,7 @@ public final class RequestConverter {
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
+    builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
     return builder.build();
   }
 
@@ -358,29 +359,64 @@ public final class RequestConverter {
    * @throws IOException
    */
   public static MultiRequest buildMultiRequest(final byte[] regionName,
-      final RowMutations rowMutations) throws IOException {
-    MultiRequest.Builder builder = MultiRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(
-      RegionSpecifierType.REGION_NAME, regionName);
-    builder.setRegion(region);
-    builder.setAtomic(true);
+      final RowMutations rowMutations)
+  throws IOException {
+    MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
     for (Mutation mutation: rowMutations.getMutations()) {
-      MutateType mutateType = null;
+      MutationType mutateType = null;
       if (mutation instanceof Put) {
-        mutateType = MutateType.PUT;
+        mutateType = MutationType.PUT;
       } else if (mutation instanceof Delete) {
-        mutateType = MutateType.DELETE;
+        mutateType = MutationType.DELETE;
       } else {
-        throw new DoNotRetryIOException(
-          "RowMutations supports only put and delete, not "
-            + mutation.getClass().getName());
+        throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
+          mutation.getClass().getName());
+      }
+      MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation);
+      builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Create a protocol buffer MultiRequest for row mutations that does not hold data.  Data/Cells
+   * are carried outside of protobuf.  Return references to the Cells in <code>cells</code> param
+   *
+   * @param regionName
+   * @param rowMutations
+   * @param cells Return in here a list of Cells as CellIterable.
+   * @return a multi request minus data
+   * @throws IOException
+   */
+  public static MultiRequest buildNoDataMultiRequest(final byte[] regionName,
+      final RowMutations rowMutations, final List<CellScannable> cells)
+  throws IOException {
+    MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
+    for (Mutation mutation: rowMutations.getMutations()) {
+      MutationType type = null;
+      if (mutation instanceof Put) {
+        type = MutationType.PUT;
+      } else if (mutation instanceof Delete) {
+        type = MutationType.DELETE;
+      } else {
+        throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
+          mutation.getClass().getName());
       }
-      Mutate mutate = ProtobufUtil.toMutate(mutateType, mutation);
-      builder.addAction(MultiAction.newBuilder().setMutate(mutate).build());
+      MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation);
+      cells.add(mutation);
+      builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
     }
     return builder.build();
   }
 
+  private static MultiRequest.Builder getMultiRequestBuilderWithRegionAndAtomicSet(final byte [] regionName,
+      final boolean atomic) {
+    MultiRequest.Builder builder = MultiRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    return builder.setAtomic(atomic);
+  }
+
   /**
    * Create a protocol buffer ScanRequest for a client Scan
    *
@@ -475,25 +511,22 @@ public final class RequestConverter {
    * @throws IOException
    */
   public static <R> MultiRequest buildMultiRequest(final byte[] regionName,
-      final List<Action<R>> actions) throws IOException {
-    MultiRequest.Builder builder = MultiRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(
-      RegionSpecifierType.REGION_NAME, regionName);
-    builder.setRegion(region);
+      final List<Action<R>> actions)
+  throws IOException {
+    MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
     for (Action<R> action: actions) {
       MultiAction.Builder protoAction = MultiAction.newBuilder();
-
       Row row = action.getAction();
       if (row instanceof Get) {
         protoAction.setGet(ProtobufUtil.toGet((Get)row));
       } else if (row instanceof Put) {
-        protoAction.setMutate(ProtobufUtil.toMutate(MutateType.PUT, (Put)row));
+        protoAction.setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row));
       } else if (row instanceof Delete) {
-        protoAction.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row));
+        protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row));
       } else if (row instanceof Append) {
-        protoAction.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, (Append)row));
+        protoAction.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row));
       } else if (row instanceof Increment) {
-        protoAction.setMutate(ProtobufUtil.toMutate((Increment)row));
+        protoAction.setMutation(ProtobufUtil.toMutation((Increment)row));
       } else if (row instanceof RowMutations) {
         continue; // ignore RowMutations
       } else {
@@ -505,6 +538,68 @@ public final class RequestConverter {
     return builder.build();
   }
 
+  /**
+   * Create a protocol buffer multirequest with NO data for a list of actions (data is carried
+   * otherwise than via protobuf).  This means it just notes attributes, whether to write the
+   * WAL, etc., and the presence in protobuf serves as place holder for the data which is
+   * coming along otherwise.  Note that Get is different.  It does not contain 'data' and is always
+   * carried by protobuf.  We return references to the data by adding them to the passed in
+   * <code>data</code> param.
+   *
+   * RowMutations in the list (if any) will be ignored.
+   *
+   * @param regionName
+   * @param actions
+   * @param cells Place to stuff references to actual data.
+   * @return a multi request that does not carry any data.
+   * @throws IOException
+   */
+  public static <R> MultiRequest buildNoDataMultiRequest(final byte[] regionName,
+      final List<Action<R>> actions, final List<CellScannable> cells)
+  throws IOException {
+    MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
+    for (Action<R> action: actions) {
+      MultiAction.Builder protoAction = MultiAction.newBuilder();
+      Row row = action.getAction();
+      if (row instanceof Get) {
+        // Gets are carried by protobufs.
+        protoAction.setGet(ProtobufUtil.toGet((Get)row));
+      } else if (row instanceof Put) {
+        Put p = (Put)row;
+        cells.add(p);
+        protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p));
+      } else if (row instanceof Delete) {
+        Delete d = (Delete)row;
+        int size = d.size();
+        // Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing
+        // in it but the row to delete.  In this case, the current implementation does not make
+        // a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases
+        // where the size returned is zero, we will send the Delete fully pb'd rather than have
+        // metadata only in the pb and then send the kv along the side in cells.
+        if (size > 0) {
+          cells.add(d);
+          protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d));
+        } else {
+          protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d));
+        }
+      } else if (row instanceof Append) {
+        Append a = (Append)row;
+        cells.add(a);
+        protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a));
+      } else if (row instanceof Increment) {
+        Increment i = (Increment)row;
+        cells.add(i);
+        protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i));
+      } else if (row instanceof RowMutations) {
+        continue; // ignore RowMutations
+      } else {
+        throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
+      }
+      builder.addAction(protoAction.build());
+    }
+    return builder.build();
+  }
+
 // End utilities for Client
 //Start utilities for Admin
 

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Wed Mar 20 19:36:46 2013
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.RpcController;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Result;
@@ -42,9 +45,8 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.hadoop.util.StringUtils;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
 
 /**
  * Helper utility to build protocol buffer responses,
@@ -78,11 +80,13 @@ public final class ResponseConverter {
    * Get the results from a protocol buffer MultiResponse
    *
    * @param proto the protocol buffer MultiResponse to convert
-   * @return the results in the MultiResponse
+   * @param cells Cells to go with the passed in <code>proto</code>.  Can be null.
+   * @return the results that were in the MultiResponse (a Result or an Exception).
    * @throws IOException
    */
-  public static List<Object> getResults(
-      final ClientProtos.MultiResponse proto) throws IOException {
+  public static List<Object> getResults(final ClientProtos.MultiResponse proto,
+      final CellScanner cells)
+  throws IOException {
     List<Object> results = new ArrayList<Object>();
     List<ActionResult> resultList = proto.getResultList();
     for (int i = 0, n = resultList.size(); i < n; i++) {
@@ -90,13 +94,8 @@ public final class ResponseConverter {
       if (result.hasException()) {
         results.add(ProtobufUtil.toException(result.getException()));
       } else if (result.hasValue()) {
-        ClientProtos.Result r = result.getValue();
-        Object value = ProtobufUtil.toResult(r);
-        if (value instanceof ClientProtos.Result) {
-          results.add(ProtobufUtil.toResult((ClientProtos.Result)value));
-        } else {
-          results.add(value);
-        }
+        ClientProtos.Result value = result.getValue();
+        results.add(ProtobufUtil.toResult(value, cells));
       } else {
         results.add(new Result());
       }

Added: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java (added)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class) 
+public class TestIPCUtil {
+  IPCUtil util;
+  @Before
+  public void before() {
+    this.util = new IPCUtil(new Configuration());
+  }
+  
+  @Test
+  public void testBuildCellBlock() throws IOException {
+    doBuildCellBlockUndoCellBlock(new KeyValueCodec(), null);
+    doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new DefaultCodec());
+    doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new GzipCodec());
+  }
+
+  void doBuildCellBlockUndoCellBlock(final Codec codec, final CompressionCodec compressor)
+  throws IOException {
+    final int count = 10;
+    Cell [] cells = getCells(count);
+    ByteBuffer bb = this.util.buildCellBlock(codec, compressor,
+      CellUtil.createCellScanner(Arrays.asList(cells).iterator()));
+    CellScanner scanner =
+      this.util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
+    int i = 0;
+    while (scanner.advance()) {
+      i++;
+    }
+    assertEquals(count, i);
+  }
+
+  static Cell [] getCells(final int howMany) {
+    Cell [] cells = new Cell[howMany];
+    for (int i = 0; i < howMany; i++) {
+      byte [] index = Bytes.toBytes(i);
+      KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, index);
+      cells[i] = kv;
+    }
+    return cells;
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java (added)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,156 @@
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category(SmallTests.class)
+public class TestPayloadCarryingRpcController {
+  @Test
+  public void testListOfCellScannerables() {
+    List<CellScannable> cells = new ArrayList<CellScannable>();
+    final int count = 10;
+    for (int i = 0; i < count; i++) {
+      cells.add(createCell(i));
+    }
+    PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
+    CellScanner cellScanner = controller.cellScanner();
+    int index = 0;
+    for (; cellScanner.advance(); index++) {
+      Cell cell = cellScanner.current();
+      byte [] indexBytes = Bytes.toBytes(index);
+      assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(),
+        cell.getValueOffset(), cell.getValueLength()));
+    }
+    assertEquals(count, index);
+  }
+
+  /**
+   * @param index
+   * @return A faked out 'Cell' that does nothing but return index as its value
+   */
+  static CellScannable createCell(final int index) {
+    return new CellScannable() {
+      @Override
+      public CellScanner cellScanner() {
+        return new CellScanner() {
+          @Override
+          public Cell current() {
+            // Fake out a Cell.  All this Cell has is a value that is an int in size and equal
+            // to the above 'index' param serialized as an int.
+            return new Cell() {
+              private final int i = index;
+
+              @Override
+              public byte[] getRowArray() {
+                // TODO Auto-generated method stub
+                return null;
+              }
+
+              @Override
+              public int getRowOffset() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public short getRowLength() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public byte[] getFamilyArray() {
+                // TODO Auto-generated method stub
+                return null;
+              }
+
+              @Override
+              public int getFamilyOffset() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public byte getFamilyLength() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public byte[] getQualifierArray() {
+                // TODO Auto-generated method stub
+                return null;
+              }
+
+              @Override
+              public int getQualifierOffset() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public int getQualifierLength() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public long getTimestamp() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public byte getTypeByte() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public long getMvccVersion() {
+                // TODO Auto-generated method stub
+                return 0;
+              }
+
+              @Override
+              public byte[] getValueArray() {
+                return Bytes.toBytes(this.i);
+              }
+
+              @Override
+              public int getValueOffset() {
+                return 0;
+              }
+
+              @Override
+              public int getValueLength() {
+                return Bytes.SIZEOF_INT;
+              }
+            };
+          }
+
+          private boolean hasCell = true;
+          @Override
+          public boolean advance() {
+            // We have one Cell only so return true first time then false ever after.
+            if (!hasCell) return hasCell;
+            hasCell = false;
+            return true;
+          }
+        };
+      }
+    };
+  }
+}

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Mar 20 19:36:46 2013
@@ -55,8 +55,8 @@ public final class HConstants {
   /**
    * The first four bytes of Hadoop RPC connections
    */
-  public static final ByteBuffer RPC_HEADER = ByteBuffer.wrap("hrpc".getBytes());
-  public static final byte CURRENT_VERSION = 5;
+  public static final ByteBuffer RPC_HEADER = ByteBuffer.wrap("HBas".getBytes());
+  public static final byte RPC_CURRENT_VERSION = 0;
 
   // HFileBlock constants.
 



Mime
View raw message