phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject [36/50] merge master to 4.0 branch
Date Mon, 10 Mar 2014 06:20:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/b3a330ca/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index bd43010,89cc4d2..0dab94d
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@@ -59,14 -61,9 +59,13 @@@ import java.util.ArrayList
  import java.util.Arrays;
  import java.util.Collections;
  import java.util.List;
- import java.util.Map;
  
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.Coprocessor;
 +import org.apache.hadoop.hbase.CoprocessorEnvironment;
  import org.apache.hadoop.hbase.HConstants;
  import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValue.Type;
  import org.apache.hadoop.hbase.client.Delete;
  import org.apache.hadoop.hbase.client.Get;
  import org.apache.hadoop.hbase.client.Mutation;
@@@ -80,28 -76,15 +79,29 @@@ import org.apache.hadoop.hbase.filter.F
  import org.apache.hadoop.hbase.filter.FilterList;
  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
- import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
- import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
  import org.apache.hadoop.hbase.regionserver.HRegion;
 +import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
  import org.apache.hadoop.hbase.regionserver.RegionScanner;
  import org.apache.hadoop.hbase.util.Bytes;
  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  import org.apache.phoenix.cache.GlobalCache;
+ import org.apache.phoenix.client.GenericKeyValueBuilder;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+ import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
  import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 +import org.apache.phoenix.protobuf.ProtobufUtil;
  import org.apache.phoenix.query.QueryConstants;
  import org.apache.phoenix.schema.AmbiguousColumnException;
  import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@@ -129,10 -112,8 +129,11 @@@ import org.apache.phoenix.util.ServerUt
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
+ import com.google.common.cache.Cache;
  import com.google.common.collect.Lists;
 +import com.google.protobuf.RpcCallback;
 +import com.google.protobuf.RpcController;
 +import com.google.protobuf.Service;
  
  /**
   * 
@@@ -235,14 -221,10 +241,11 @@@ public class MetaDataEndpointImpl exten
              return null;
          }
          int length = getVarCharLength(keyBuffer, keyOffset, keyLength);
-         // TODO: PNameImpl that doesn't need to copy the bytes
-         byte[] pnameBuf = new byte[length];
-         System.arraycopy(keyBuffer, keyOffset, pnameBuf, 0, length);
-         return PNameFactory.newName(pnameBuf);
+         return PNameFactory.newName(keyBuffer, keyOffset, length);
      }
 -    
 -    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) throws IOException {
 +
 +    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp)
 +            throws IOException {
          Scan scan = new Scan();
          scan.setTimeRange(startTimeStamp, stopTimeStamp);
          scan.setStartRow(key);
@@@ -252,159 -234,18 +255,158 @@@
          return scan;
      }
  
 +    private RegionCoprocessorEnvironment env;
 +
 +    private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) {
 +        byte[] startKey = region.getStartKey();
 +        byte[] endKey = region.getEndKey();
 +        if (Bytes.compareTo(startKey, key) <= 0
 +                && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
 +                    endKey) < 0)) {
 +            return null; // normal case;
 +        }
 +        return new MetaDataMutationResult(MutationCode.TABLE_NOT_IN_REGION,
 +                EnvironmentEdgeManager.currentTimeMillis(), null);
 +    }
 +
 +    /**
 +     * Stores a reference to the coprocessor environment provided by the
 +     * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
 +     * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
 +     * on a table region, so always expects this to be an instance of
 +     * {@link RegionCoprocessorEnvironment}.
 +     * @param env the environment provided by the coprocessor host
 +     * @throws IOException if the provided environment is not an instance of
 +     *             {@code RegionCoprocessorEnvironment}
 +     */
      @Override
 -    public RegionCoprocessorEnvironment getEnvironment() {
 -        return (RegionCoprocessorEnvironment)super.getEnvironment();
 +    public void start(CoprocessorEnvironment env) throws IOException {
 +        if (env instanceof RegionCoprocessorEnvironment) {
 +            this.env = (RegionCoprocessorEnvironment) env;
 +        } else {
 +            throw new CoprocessorException("Must be loaded on a table region!");
 +        }
      }
 -    
 -    private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException, SQLException {
 +
 +    @Override
 +    public void stop(CoprocessorEnvironment env) throws IOException {
 +        // nothing to do
 +    }
 +
 +    @Override
 +    public Service getService() {
 +        return this;
 +    }
 +
 +    @Override
 +    public void getTable(RpcController controller, GetTableRequest request,
 +            RpcCallback<MetaDataResponse> done) {
 +        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
 +        byte[] tenantId = request.getTenantId().toByteArray();
 +        byte[] schemaName = request.getSchemaName().toByteArray();
 +        byte[] tableName = request.getTableName().toByteArray();
 +        byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
 +        long tableTimeStamp = request.getTableTimestamp();
 +
 +        try {
 +            // TODO: check that key is within region.getStartKey() and region.getEndKey()
 +            // and return special code to force client to lookup region from meta.
 +            HRegion region = env.getRegion();
 +            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
 +            if (result != null) {
 +                done.run(MetaDataMutationResult.toProto(result));
 +                return;
 +            }
 +
 +            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
 +            PTable table = doGetTable(key, request.getClientTimestamp());
 +            if (table == null) {
 +                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
 +                builder.setMutationTime(currentTime);
 +                done.run(builder.build());
 +                return;
 +            }
 +            builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
 +            builder.setMutationTime(currentTime);
 +            if (table.getTimeStamp() != tableTimeStamp) {
 +                builder.setTable(PTableImpl.toProto(table));
 +            }
 +            done.run(builder.build());
 +            return;
 +        } catch (Throwable t) {
 +        	logger.error("getTable failed", t);
 +            ProtobufUtil.setControllerException(controller,
 +                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
 +        }
 +    }
 +
 +    private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
 +        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-         Map<ImmutableBytesPtr, PTable> metaDataCache =
++        Cache<ImmutableBytesPtr, PTable> metaDataCache =
 +                GlobalCache.getInstance(this.env).getMetaDataCache();
-         PTable table = metaDataCache.get(cacheKey);
++        PTable table = metaDataCache.getIfPresent(cacheKey);
 +        // We only cache the latest, so we'll end up building the table with every call if the
 +        // client connection has specified an SCN.
 +        // TODO: If we indicate to the client that we're returning an older version, but there's a
 +        // newer version available, the client
 +        // can safely not call this, since we only allow modifications to the latest.
 +        if (table != null && table.getTimeStamp() < clientTimeStamp) {
 +            // Table on client is up-to-date with table on server, so just return
 +            if (isTableDeleted(table)) {
 +                return null;
 +            }
 +            return table;
 +        }
 +        // Ask Lars about the expense of this call - if we don't take the lock, we still won't get
 +        // partial results
 +        // get the co-processor environment
 +        // TODO: check that key is within region.getStartKey() and region.getEndKey()
 +        // and return special code to force client to lookup region from meta.
 +        HRegion region = env.getRegion();
 +        /*
 +         * Lock directly on key, though it may be an index table. This will just prevent a table
 +         * from getting rebuilt too often.
 +         */
 +        RowLock rowLock = region.getRowLock(key);
 +        if (rowLock == null) {
 +            throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
 +        }
 +        try {
 +            // Try cache again in case we were waiting on a lock
-             table = metaDataCache.get(cacheKey);
++            table = metaDataCache.getIfPresent(cacheKey);
 +            // We only cache the latest, so we'll end up building the table with every call if the
 +            // client connection has specified an SCN.
 +            // TODO: If we indicate to the client that we're returning an older version, but there's
 +            // a newer version available, the client
 +            // can safely not call this, since we only allow modifications to the latest.
 +            if (table != null && table.getTimeStamp() < clientTimeStamp) {
 +                // Table on client is up-to-date with table on server, so just return
 +                if (isTableDeleted(table)) {
 +                    return null;
 +                }
 +                return table;
 +            }
 +            // Query for the latest table first, since it's not cached
 +            table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
 +            if (table != null && table.getTimeStamp() < clientTimeStamp) {
 +                return table;
 +            }
 +            // Otherwise, query for an older version of the table - it won't be cached
 +            return buildTable(key, cacheKey, region, clientTimeStamp);
 +        } finally {
 +            rowLock.release();
 +        }
 +    }
 +
 +    private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
 +            long clientTimeStamp) throws IOException, SQLException {
          Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
          RegionScanner scanner = region.getScanner(scan);
-         Map<ImmutableBytesPtr, PTable> metaDataCache =
-                 GlobalCache.getInstance(this.env).getMetaDataCache();
 -        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
++
++        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
          try {
-             PTable oldTable = metaDataCache.get(cacheKey);
-             long tableTimeStamp =
-                     oldTable == null ? MIN_TABLE_TIMESTAMP - 1 : oldTable.getTimeStamp();
+             PTable oldTable = metaDataCache.getIfPresent(cacheKey);
+             long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
              PTable newTable;
              newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
              if (newTable == null) {
@@@ -412,26 -253,9 +414,13 @@@
              }
              if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) {
                  if (logger.isDebugEnabled()) {
 -                    logger.debug("Caching table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()) + " at seqNum " + newTable.getSequenceNumber() + " with newer timestamp " + newTable.getTimeStamp() + " versus " + tableTimeStamp);
 +                    logger.debug("Caching table "
 +                            + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
 +                                cacheKey.getLength()) + " at seqNum "
 +                            + newTable.getSequenceNumber() + " with newer timestamp "
 +                            + newTable.getTimeStamp() + " versus " + tableTimeStamp);
                  }
-                 oldTable = metaDataCache.put(cacheKey, newTable);
-                 if (logger.isDebugEnabled()) {
-                     if (oldTable == null) {
-                         logger.debug("No previously cached table "
-                                 + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
-                                     cacheKey.getLength()));
-                     } else {
-                         logger.debug("Previously cached table "
-                                 + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
-                                     cacheKey.getLength()) + " was at seqNum "
-                                 + oldTable.getSequenceNumber() + " with timestamp "
-                                 + oldTable.getTimeStamp());
-                     }
-                 }
+                 metaDataCache.put(cacheKey, newTable);
              }
              return newTable;
          } finally {
@@@ -469,55 -290,33 +458,56 @@@
                  i++; // shouldn't happen - means unexpected KV in system table column row
              }
          }
-         // COLUMN_SIZE and DECIMAL_DIGIT are optional. NULLABLE, DATA_TYPE and ORDINAL_POSITION_KV
-         // are required.
-         if (colKeyValues[SQL_DATA_TYPE_INDEX] == null || colKeyValues[NULLABLE_INDEX] == null
 -        // COLUMN_SIZE and DECIMAL_DIGIT are optional. NULLABLE, DATA_TYPE and ORDINAL_POSITION_KV are required.
++
+         if (colKeyValues[DATA_TYPE_INDEX] == null || colKeyValues[NULLABLE_INDEX] == null
                  || colKeyValues[ORDINAL_POSITION_INDEX] == null) {
 -            throw new IllegalStateException("Didn't find all required key values in '" + colName.getString() + "' column metadata row");
 +            throw new IllegalStateException("Didn't find all required key values in '"
 +                    + colName.getString() + "' column metadata row");
          }
 -        KeyValue columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
 -        Integer maxLength = columnSizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(columnSizeKv.getBuffer(), columnSizeKv.getValueOffset(), SortOrder.getDefault());
 -        KeyValue decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
 -        Integer scale = decimalDigitKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(decimalDigitKv.getBuffer(), decimalDigitKv.getValueOffset(), SortOrder.getDefault());
 -        KeyValue ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
 -        int position = PDataType.INTEGER.getCodec().decodeInt(ordinalPositionKv.getBuffer(), ordinalPositionKv.getValueOffset(), SortOrder.getDefault());
 -        KeyValue nullableKv = colKeyValues[NULLABLE_INDEX];
 -        boolean isNullable = PDataType.INTEGER.getCodec().decodeInt(nullableKv.getBuffer(), nullableKv.getValueOffset(), SortOrder.getDefault()) != ResultSetMetaData.columnNoNulls;
 -        KeyValue dataTypeKv = colKeyValues[DATA_TYPE_INDEX];
 -        PDataType dataType = PDataType.fromTypeId(PDataType.INTEGER.getCodec().decodeInt(dataTypeKv.getBuffer(), dataTypeKv.getValueOffset(), SortOrder.getDefault()));
 -        KeyValue sortOrderKv = colKeyValues[SORT_ORDER_INDEX];
 -        SortOrder sortOrder = sortOrderKv == null ? SortOrder.getDefault() : SortOrder.fromSystemValue(PDataType.INTEGER.getCodec().decodeInt(sortOrderKv.getBuffer(), sortOrderKv.getValueOffset(), SortOrder.getDefault()));
 -        KeyValue arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
 -        Integer arraySize = arraySizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(arraySizeKv.getBuffer(), arraySizeKv.getValueOffset(), SortOrder.getDefault());
 -        KeyValue viewConstantKv = colKeyValues[VIEW_CONSTANT_INDEX];
++
 +        Cell columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
 +        Integer maxLength =
 +                columnSizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(
-                     columnSizeKv.getValueArray(), columnSizeKv.getValueOffset(), null);
++                    columnSizeKv.getValueArray(), columnSizeKv.getValueOffset(), SortOrder.getDefault());
 +        Cell decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
 +        Integer scale =
 +                decimalDigitKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(
-                     decimalDigitKv.getValueArray(), decimalDigitKv.getValueOffset(), null);
++                    decimalDigitKv.getValueArray(), decimalDigitKv.getValueOffset(), SortOrder.getDefault());
 +        Cell ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
 +        int position =
 +                PDataType.INTEGER.getCodec().decodeInt(ordinalPositionKv.getValueArray(),
-                     ordinalPositionKv.getValueOffset(), null);
++                    ordinalPositionKv.getValueOffset(), SortOrder.getDefault());
 +        Cell nullableKv = colKeyValues[NULLABLE_INDEX];
 +        boolean isNullable =
 +                PDataType.INTEGER.getCodec().decodeInt(nullableKv.getValueArray(),
-                     nullableKv.getValueOffset(), null) != ResultSetMetaData.columnNoNulls;
-         Cell sqlDataTypeKv = colKeyValues[SQL_DATA_TYPE_INDEX];
++                    nullableKv.getValueOffset(), SortOrder.getDefault()) != ResultSetMetaData.columnNoNulls;
++        Cell dataTypeKv = colKeyValues[DATA_TYPE_INDEX];
 +        PDataType dataType =
 +                PDataType.fromTypeId(PDataType.INTEGER.getCodec().decodeInt(
-                     sqlDataTypeKv.getValueArray(), sqlDataTypeKv.getValueOffset(), null));
++                  dataTypeKv.getValueArray(), dataTypeKv.getValueOffset(), SortOrder.getDefault()));
 +        if (maxLength == null && dataType == PDataType.BINARY) dataType = PDataType.VARBINARY; // For
 +                                                                                               // backward
 +                                                                                               // compatibility.
-         Cell columnModifierKv = colKeyValues[COLUMN_MODIFIER_INDEX];
-         ColumnModifier sortOrder =
-                 columnModifierKv == null ? null : ColumnModifier.fromSystemValue(PDataType.INTEGER
-                         .getCodec().decodeInt(columnModifierKv.getValueArray(),
-                             columnModifierKv.getValueOffset(), null));
++        Cell sortOrderKv = colKeyValues[SORT_ORDER_INDEX];
++        SortOrder sortOrder =
++        		sortOrderKv == null ? SortOrder.getDefault() : SortOrder.fromSystemValue(PDataType.INTEGER
++                        .getCodec().decodeInt(sortOrderKv.getValueArray(),
++                        		sortOrderKv.getValueOffset(), SortOrder.getDefault()));
 +        
 +        Cell arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
 +        Integer arraySize = arraySizeKv == null ? null : 
-           PDataType.INTEGER.getCodec().decodeInt(arraySizeKv.getValueArray(), arraySizeKv.getValueOffset(), null);
-     
-         PColumn column =
-                 new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable,
-                         position - 1, sortOrder, arraySize);
++          PDataType.INTEGER.getCodec().decodeInt(arraySizeKv.getValueArray(), arraySizeKv.getValueOffset(), SortOrder.getDefault());
++ 
++        Cell viewConstantKv = colKeyValues[VIEW_CONSTANT_INDEX];
+         byte[] viewConstant = viewConstantKv == null ? null : viewConstantKv.getValue();
 -        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant);
++        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable,
++                        position - 1, sortOrder, arraySize, viewConstant);
          columns.add(column);
      }
 -
 -    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp) throws IOException, SQLException {
 -        List<KeyValue> results = Lists.newArrayList();
 +    
 +    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp)
 +        throws IOException, SQLException {
 +        List<Cell> results = Lists.newArrayList();
          scanner.next(results);
          if (results.isEmpty()) {
              return null;
@@@ -531,15 -330,17 +521,18 @@@
          int keyLength = keyValue.getRowLength();
          int keyOffset = keyValue.getRowOffset();
          PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
--        int tenantIdLength = tenantId.getBytes().length;
-         PName schemaName = newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength);
++        int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
+         if (tenantIdLength == 0) {
+             tenantId = null;
+         }
+         PName schemaName = newPName(keyBuffer, keyOffset+tenantIdLength+1, keyLength);
          int schemaNameLength = schemaName.getBytes().length;
 -        int tableNameLength = keyLength-schemaNameLength-1-tenantIdLength-1;
 +        int tableNameLength = keyLength - schemaNameLength - 1 - tenantIdLength - 1;
          byte[] tableNameBytes = new byte[tableNameLength];
 -        System.arraycopy(keyBuffer, keyOffset+schemaNameLength+1+tenantIdLength+1, tableNameBytes, 0, tableNameLength);
 +        System.arraycopy(keyBuffer, keyOffset + schemaNameLength + 1 + tenantIdLength + 1,
 +            tableNameBytes, 0, tableNameLength);
          PName tableName = PNameFactory.newName(tableNameBytes);
 -        
 +    
          int offset = tenantIdLength + schemaNameLength + tableNameLength + 3;
          // This will prevent the client from continually looking for the current
          // table when we know that there will never be one since we disallow updates
@@@ -577,89 -375,70 +570,92 @@@
          // TABLE_TYPE, TABLE_SEQ_NUM and COLUMN_COUNT are required.
          if (tableKeyValues[TABLE_TYPE_INDEX] == null || tableKeyValues[TABLE_SEQ_NUM_INDEX] == null
                  || tableKeyValues[COLUMN_COUNT_INDEX] == null) {
 -            throw new IllegalStateException("Didn't find expected key values for table row in metadata row");
 +            throw new IllegalStateException(
 +                    "Didn't find expected key values for table row in metadata row");
          }
 -        KeyValue tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
 -        PTableType tableType = PTableType.fromSerializedValue(tableTypeKv.getBuffer()[tableTypeKv.getValueOffset()]);
 -        KeyValue tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
 -        long tableSeqNum = PDataType.LONG.getCodec().decodeLong(tableSeqNumKv.getBuffer(), tableSeqNumKv.getValueOffset(), SortOrder.getDefault());
 -        KeyValue columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
 -        int columnCount = PDataType.INTEGER.getCodec().decodeInt(columnCountKv.getBuffer(), columnCountKv.getValueOffset(), SortOrder.getDefault());
 -        KeyValue pkNameKv = tableKeyValues[PK_NAME_INDEX];
 -        PName pkName = pkNameKv != null ? newPName(pkNameKv.getBuffer(), pkNameKv.getValueOffset(), pkNameKv.getValueLength()) : null;
 -        KeyValue saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
 -        Integer saltBucketNum = saltBucketNumKv != null ? (Integer)PDataType.INTEGER.getCodec().decodeInt(saltBucketNumKv.getBuffer(), saltBucketNumKv.getValueOffset(), SortOrder.getDefault()) : null;
 -        KeyValue dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
 -        PName dataTableName = dataTableNameKv != null ? newPName(dataTableNameKv.getBuffer(), dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
 -        KeyValue indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
 -        PIndexState indexState = indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv.getBuffer()[indexStateKv.getValueOffset()]);
 -        KeyValue immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
 -        boolean isImmutableRows = immutableRowsKv == null ? false : (Boolean)PDataType.BOOLEAN.toObject(immutableRowsKv.getBuffer(), immutableRowsKv.getValueOffset(), immutableRowsKv.getValueLength());
 -        KeyValue defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
 -        PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getBuffer(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
 -        KeyValue viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
 -        String viewStatement = viewStatementKv != null ? (String)PDataType.VARCHAR.toObject(viewStatementKv.getBuffer(), viewStatementKv.getValueOffset(), viewStatementKv.getValueLength()) : null;
 -        KeyValue disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
 -        boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(disableWALKv.getBuffer(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
 -        KeyValue multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
 -        boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(multiTenantKv.getBuffer(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
 -        KeyValue viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
 -        ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getBuffer()[viewTypeKv.getValueOffset()]);
 -        KeyValue viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
 -        Short viewIndexId = viewIndexIdKv == null ? null : (Short)MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(viewIndexIdKv.getBuffer(), viewIndexIdKv.getValueOffset(), SortOrder.getDefault());
++
 +        Cell tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
 +        PTableType tableType =
 +                PTableType
 +                        .fromSerializedValue(tableTypeKv.getValueArray()[tableTypeKv.getValueOffset()]);
 +        Cell tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
 +        long tableSeqNum =
 +                PDataType.LONG.getCodec().decodeLong(tableSeqNumKv.getValueArray(),
-                     tableSeqNumKv.getValueOffset(), null);
++                    tableSeqNumKv.getValueOffset(), SortOrder.getDefault());
 +        Cell columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
 +        int columnCount =
 +                PDataType.INTEGER.getCodec().decodeInt(columnCountKv.getValueArray(),
-                     columnCountKv.getValueOffset(), null);
++                    columnCountKv.getValueOffset(), SortOrder.getDefault());
 +        Cell pkNameKv = tableKeyValues[PK_NAME_INDEX];
 +        PName pkName =
 +                pkNameKv != null ? newPName(pkNameKv.getValueArray(), pkNameKv.getValueOffset(),
 +                    pkNameKv.getValueLength()) : null;
 +        Cell saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
 +        Integer saltBucketNum =
 +                saltBucketNumKv != null ? (Integer) PDataType.INTEGER.getCodec().decodeInt(
-                     saltBucketNumKv.getValueArray(), saltBucketNumKv.getValueOffset(), null) : null;
++                    saltBucketNumKv.getValueArray(), saltBucketNumKv.getValueOffset(), SortOrder.getDefault()) : null;
 +        Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
 +        PName dataTableName =
 +                dataTableNameKv != null ? newPName(dataTableNameKv.getValueArray(),
 +                    dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
 +        Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
 +        PIndexState indexState =
 +                indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
 +                        .getValueArray()[indexStateKv.getValueOffset()]);
 +        Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
 +        boolean isImmutableRows =
 +                immutableRowsKv == null ? false : (Boolean) PDataType.BOOLEAN.toObject(
 +                    immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
 +                    immutableRowsKv.getValueLength());
 +        Cell defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
 +        PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getValueArray(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
 +        Cell viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
 +        String viewStatement = viewStatementKv != null ? (String)PDataType.VARCHAR.toObject(viewStatementKv.getValueArray(), viewStatementKv.getValueOffset(), viewStatementKv.getValueLength()) : null;
 +        Cell disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
 +        boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(disableWALKv.getValueArray(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
 +        Cell multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
 +        boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
 +        Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
 +        ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
++        Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
++        Short viewIndexId = viewIndexIdKv == null ? null : (Short)MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(viewIndexIdKv.getValueArray(), viewIndexIdKv.getValueOffset(), SortOrder.getDefault());
          
          List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
          List<PTable> indexes = new ArrayList<PTable>();
          List<PName> physicalTables = new ArrayList<PName>();
          while (true) {
 -            results.clear();
 -            scanner.next(results);
 -            if (results.isEmpty()) {
 -                break;
 -            }
 -            KeyValue colKv = results.get(LINK_TYPE_INDEX);
 -            int colKeyLength = colKv.getRowLength();
 -            PName colName = newPName(colKv.getBuffer(), colKv.getRowOffset() + offset, colKeyLength-offset);
 -            int colKeyOffset = offset + colName.getBytes().length + 1;
 -            PName famName = newPName(colKv.getBuffer(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
 -            if (colName.getString().isEmpty() && famName != null) {
 -                LinkType linkType = LinkType.fromSerializedValue(colKv.getBuffer()[colKv.getValueOffset()]);
 -                if (linkType == LinkType.INDEX_TABLE) {
 -                    addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
 -                } else if (linkType == LinkType.PHYSICAL_TABLE) {
 -                    physicalTables.add(famName);
 -                } else {
 -                    logger.warn("Unknown link type: " + colKv.getBuffer()[colKv.getValueOffset()] + " for " + SchemaUtil.getTableName(schemaName.getString(), tableName.getString()));
 -                }
 -            } else {
 -                addColumnToTable(results, colName, famName, colKeyValues, columns);
 -            }
 +          results.clear();
 +          scanner.next(results);
 +          if (results.isEmpty()) {
 +              break;
 +          }
 +          Cell colKv = results.get(LINK_TYPE_INDEX);
 +          int colKeyLength = colKv.getRowLength();
 +          PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
 +          int colKeyOffset = offset + colName.getBytes().length + 1;
 +          PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
 +          if (colName.getString().isEmpty() && famName != null) {
 +              LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
 +              if (linkType == LinkType.INDEX_TABLE) {
 +                  addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
 +              } else if (linkType == LinkType.PHYSICAL_TABLE) {
 +                  physicalTables.add(famName);
 +              } else {
 +                  logger.warn("Unknown link type: " + colKv.getValueArray()[colKv.getValueOffset()] + " for " + SchemaUtil.getTableName(schemaName.getString(), tableName.getString()));
 +              }
 +          } else {
 +              addColumnToTable(results, colName, famName, colKeyValues, columns);
 +          }
          }
--        
-         return PTableImpl.makePTable(schemaName, tableName, tableType, indexState, timeStamp, 
 -        return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, 
 -                tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId);
++
++        return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, 
 +            tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null, 
 +            indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, 
-             multiTenant, viewType);
++            multiTenant, viewType, viewIndexId);
      }
  
 -    private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException {
 +    private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
 +        long clientTimeStamp) throws IOException {
          if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
              return null;
          }
@@@ -672,10 -451,9 +668,10 @@@
          scanner.next(results);
          // HBase ignores the time range on a raw scan (HBASE-7362)
          if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
 -            KeyValue kv = results.get(0);
 -            if (kv.isDelete()) {
 -                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
 +            Cell kv = results.get(0);
 +            if (kv.getTypeByte() == Type.Delete.getCode()) {
-                 Map<ImmutableBytesPtr, PTable> metaDataCache =
++                Cache<ImmutableBytesPtr, PTable> metaDataCache =
 +                        GlobalCache.getInstance(this.env).getMetaDataCache();
                  PTable table = newDeletedTableMarker(kv.getTimestamp());
                  metaDataCache.put(cacheKey, table);
                  return table;
@@@ -692,13 -470,10 +688,12 @@@
          return table.getName() == null;
      }
  
 -    private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException {
 +    private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
 +        ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
 +        throws IOException, SQLException {
          HRegion region = env.getRegion();
-         Map<ImmutableBytesPtr, PTable> metaDataCache =
-                 GlobalCache.getInstance(this.env).getMetaDataCache();
-         PTable table = metaDataCache.get(cacheKey);
 -        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
++        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+         PTable table = metaDataCache.getIfPresent(cacheKey);
          // We always cache the latest version - fault in if not in cache
          if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
              return table;
@@@ -775,64 -530,44 +770,62 @@@
                  }
                  // Load child table next
                  ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
 -                // Get as of latest timestamp so we can detect if we have a newer table that already exists
 +                // Get as of latest timestamp so we can detect if we have a newer table that already
 +                // exists
                  // without making an additional query
 -                PTable table = loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
 +                PTable table =
 +                        loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
                  if (table != null) {
                      if (table.getTimeStamp() < clientTimeStamp) {
 -                        // If the table is older than the client time stamp and it's deleted, continue
 +                        // If the table is older than the client time stamp and it's deleted,
 +                        // continue
                          if (!isTableDeleted(table)) {
 -                            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table);
 +                            builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
 +                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
 +                            builder.setTable(PTableImpl.toProto(table));
 +                            done.run(builder.build());
 +                            return;
                          }
                      } else {
 -                        return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
 +                        builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
 +                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
 +                        builder.setTable(PTableImpl.toProto(table));
 +                        done.run(builder.build());
 +                        return;
                      }
                  }
 -                
 -                // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the system
 +                // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
 +                // system
                  // table. Basically, we get all the locks that we don't already hold for all the
 -                // tableMetadata rows. This ensures we don't have deadlock situations (ensuring primary and
 -                // then index table locks are held, in that order). For now, we just don't support indexing
 -                // on the system table. This is an issue because of the way we manage batch mutation in the
 +                // tableMetadata rows. This ensures we don't have deadlock situations (ensuring
 +                // primary and
 +                // then index table locks are held, in that order). For now, we just don't support
 +                // indexing
 +                // on the system table. This is an issue because of the way we manage batch mutation
 +                // in the
                  // Indexer.
 -                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
 -                
 +                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
 +
                  // Invalidate the cache - the next getTable call will add it
-                 // TODO: consider loading the table that was just created here, patching up the
-                 // parent table, and updating the cache
-                 Map<ImmutableBytesPtr, PTable> metaDataCache =
-                         GlobalCache.getInstance(this.env).getMetaDataCache();
+                 // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
 -                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
++                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                  if (parentCacheKey != null) {
-                     metaDataCache.remove(parentCacheKey);
+                     metaDataCache.invalidate(parentCacheKey);
                  }
-                 metaDataCache.remove(cacheKey);
+                 metaDataCache.invalidate(cacheKey);
                  // Get timeStamp from mutations - the above method sets it if it's unset
                  long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
 -                return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, currentTimeStamp, null);
 +                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
 +                builder.setMutationTime(currentTimeStamp);
 +                done.run(builder.build());
 +                return;
              } finally {
 -                releaseLocks(region, lids);
 +                region.releaseRowLocks(locks);
              }
          } catch (Throwable t) {
 -            ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
 -            return null; // impossible
 +          logger.error("createTable failed", t);
 +            ProtobufUtil.setControllerException(controller,
 +                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
          }
      }
  
@@@ -916,65 -644,53 +909,63 @@@
              HRegion region = env.getRegion();
              MetaDataMutationResult result = checkTableKeyInRegion(key, region);
              if (result != null) {
 -                return result; 
 +                done.run(MetaDataMutationResult.toProto(result));
 +                return;
              }
 -            List<Integer> lids = Lists.newArrayList(5);
 +            List<RowLock> locks = Lists.newArrayList();
              try {
 -                acquireLock(region, lockKey, lids);
 +                acquireLock(region, lockKey, locks);
                  if (key != lockKey) {
 -                    acquireLock(region, key, lids);
 +                    acquireLock(region, key, locks);
                  }
                  List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
 -                result = doDropTable(key, tenantIdBytes, schemaName, tableName, PTableType.fromSerializedValue(tableType), tableMetadata, invalidateList, lids, tableNamesToDelete);
 -                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS || result.getTable() == null) {
 -                    return result;
 +                result =
 +                        doDropTable(key, tenantIdBytes, schemaName, tableName,
 +                            PTableType.fromSerializedValue(tableType), tableMetadata,
 +                            invalidateList, locks, tableNamesToDelete);
 +                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS
 +                        || result.getTable() == null) {
 +                    done.run(MetaDataMutationResult.toProto(result));
 +                    return;
                  }
-                 Map<ImmutableBytesPtr, PTable> metaDataCache =
-                         GlobalCache.getInstance(this.env).getMetaDataCache();
 -                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
++                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                  // Commit the list of deletion.
 -                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
 +                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                  long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
 -                for (ImmutableBytesPtr ckey: invalidateList) {
 +                for (ImmutableBytesPtr ckey : invalidateList) {
                      metaDataCache.put(ckey, newDeletedTableMarker(currentTime));
                  }
                  if (parentTableName != null) {
                      ImmutableBytesPtr parentCacheKey = new ImmutableBytesPtr(lockKey);
-                     metaDataCache.remove(parentCacheKey);
+                     metaDataCache.invalidate(parentCacheKey);
                  }
 -                return result;
 +                done.run(MetaDataMutationResult.toProto(result));
 +                return;
              } finally {
 -                releaseLocks(region, lids);
 +                region.releaseRowLocks(locks);
              }
          } catch (Throwable t) {
 -            ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
 -            return null; // impossible
 +          logger.error("dropTable failed", t);
 +            ProtobufUtil.setControllerException(controller,
 +                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
          }
      }
  
 -    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName, PTableType tableType, 
 -            List<Mutation> rowsToDelete, List<ImmutableBytesPtr> invalidateList, List<Integer> lids, List<byte[]> tableNamesToDelete) throws IOException, SQLException {
 +    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
 +        byte[] tableName, PTableType tableType, List<Mutation> rowsToDelete,
 +        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
 +        List<byte[]> tableNamesToDelete) throws IOException, SQLException {
          long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
 -
 -        RegionCoprocessorEnvironment env = getEnvironment();
 +    
          HRegion region = env.getRegion();
          ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-     
-         Map<ImmutableBytesPtr, PTable> metaDataCache =
-                 GlobalCache.getInstance(this.env).getMetaDataCache();
-         PTable table = metaDataCache.get(cacheKey);
-     
+         
 -        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
++        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+         PTable table = metaDataCache.getIfPresent(cacheKey);
+         
          // We always cache the latest version - fault in if not in cache
 -        if (table != null || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
 +        if (table != null
 +                || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
              if (table.getTimeStamp() < clientTimeStamp) {
                  // If the table is older than the client time stamp and its deleted, continue
                  if (isTableDeleted(table)) {
@@@ -1017,27 -724,26 +1008,27 @@@
          // Don't allow a table with views to be deleted
          // TODO: support CASCADE with DROP
          if (tableType == PTableType.TABLE && hasViews(region, tenantId, table)) {
 -            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
 +            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
 +              EnvironmentEdgeManager.currentTimeMillis(), null);
          }
          if (table.getType() != PTableType.VIEW) { // Add to list of HTables to delete, unless it's a view
-             tableNamesToDelete.add(table.getName().getBytes());
+             byte[] fullName = table.getName().getBytes();
+             tableNamesToDelete.add(fullName);
          }
          List<byte[]> indexNames = Lists.newArrayList();
          invalidateList.add(cacheKey);
          byte[][] rowKeyMetaData = new byte[5][];
 -        byte[] rowKey;
          do {
 -            KeyValue kv = results.get(LINK_TYPE_INDEX);
 -            rowKey = kv.getRow();
 -            int nColumns = getVarChars(rowKey, rowKeyMetaData);
 -            if (nColumns == 5 && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0 && rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX].length > 0
 -                    && Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0
 -                    && LinkType.fromSerializedValue(kv.getBuffer()[kv.getValueOffset()]) == LinkType.INDEX_TABLE) {
 +            Cell kv = results.get(LINK_TYPE_INDEX);
 +            int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), 0, rowKeyMetaData);
 +            if (nColumns == 5
 +                    && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0
 +                    && rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX].length > 0
 +                    && Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0
 +                    && LinkType.fromSerializedValue(kv.getValueArray()[kv.getValueOffset()]) == LinkType.INDEX_TABLE) {
                  indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX]);
              }
-             @SuppressWarnings("deprecation")
 -            @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
 +            // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
              // FIXME: the version of the Delete constructor without the lock args was introduced
              // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
              // of the client.
@@@ -1050,8 -756,7 +1041,7 @@@
          // Recursively delete indexes
          for (byte[] indexName : indexNames) {
              byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, indexName);
-             @SuppressWarnings("deprecation")
 -            @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
 +            // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
              // FIXME: the version of the Delete constructor without the lock args was introduced
              // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
              // of the client.
@@@ -1098,17 -796,13 +1088,16 @@@
                  ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
                  List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
                  invalidateList.add(cacheKey);
-                 Map<ImmutableBytesPtr, PTable> metaDataCache =
-                         GlobalCache.getInstance(this.env).getMetaDataCache();
-                 PTable table = metaDataCache.get(cacheKey);
 -                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
++                Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+                 PTable table = metaDataCache.getIfPresent(cacheKey);
                  if (logger.isDebugEnabled()) {
                      if (table == null) {
 -                        logger.debug("Table " + Bytes.toStringBinary(key) + " not found in cache. Will build through scan");
 +                        logger.debug("Table " + Bytes.toStringBinary(key)
 +                                + " not found in cache. Will build through scan");
                      } else {
 -                        logger.debug("Table " + Bytes.toStringBinary(key) + " found in cache with timestamp " + table.getTimeStamp() + " seqNum " + table.getSequenceNumber());
 +                        logger.debug("Table " + Bytes.toStringBinary(key)
 +                                + " found in cache with timestamp " + table.getTimeStamp()
 +                                + " seqNum " + table.getSequenceNumber());
                      }
                  }
                  // Get client timeStamp from mutations
@@@ -1144,60 -826,42 +1133,48 @@@
                  }
                  if (expectedSeqNum != table.getSequenceNumber()) {
                      if (logger.isDebugEnabled()) {
 -                        logger.debug("For table " + Bytes.toStringBinary(key) + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
 +                        logger.debug("For table " + Bytes.toStringBinary(key)
 +                                + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
                      }
 -                    return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table);
 +                    return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
 +                            EnvironmentEdgeManager.currentTimeMillis(), table);
                  }
 -                
 +        
                  PTableType type = table.getType();
 -                if (type == PTableType.INDEX) { 
 +                if (type == PTableType.INDEX) {
                      // Disallow mutation of an index table
 -                    return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
 +                    return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
 +                            EnvironmentEdgeManager.currentTimeMillis(), null);
                  } else {
-                   PTableType expectedType = MetaDataUtil.getTableType(tableMetadata);
-                   // We said to drop a table, but found a view or visa versa
-                   if (type != expectedType) {
-                       return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
-                   }
-                   if (hasViews(region, tenantId, table)) {
-                       // Disallow any column mutations for parents of tenant tables
-                       return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
-                   }
+                     // server-side, except for indexing, we always expect the keyvalues to be standard KeyValues
+                     PTableType expectedType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesPtr());
+                     // We said to drop a table, but found a view or visa versa
+                     if (type != expectedType) {
+                         return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                     }
+                     if (hasViews(region, tenantId, table)) {
+                         // Disallow any column mutations for parents of tenant tables
+                         return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+                     }
                  }
 -                result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region, invalidateList, lids);
 +                result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region,
 +                            invalidateList, locks);
                  if (result != null) {
                      return result;
                  }
 -                
 -                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
 +        
 +                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                  // Invalidate from cache
                  for (ImmutableBytesPtr invalidateKey : invalidateList) {
-                     PTable invalidatedTable = metaDataCache.remove(invalidateKey);
-                     if (logger.isDebugEnabled()) {
-                         if (invalidatedTable == null) {
-                             logger.debug("Attempted to invalidated table key "
-                                     + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
-                                         cacheKey.getLength()) + " but found no cached table");
-                         } else {
-                             logger.debug("Invalidated table key "
-                                     + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
-                                         cacheKey.getLength()) + " with timestamp "
-                                     + invalidatedTable.getTimeStamp() + " and seqNum "
-                                     + invalidatedTable.getSequenceNumber());
-                         }
-                     }
+                     metaDataCache.invalidate(invalidateKey);
                  }
 -                // Get client timeStamp from mutations, since it may get updated by the mutateRowsWithLocks call
 +                // Get client timeStamp from mutations, since it may get updated by the
 +                // mutateRowsWithLocks call
                  long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
 -                return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null);
 +                return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime,
 +                        null);
              } finally {
 -                releaseLocks(region,lids);
 +                region.releaseRowLocks(locks);
              }
          } catch (Throwable t) {
              ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
@@@ -1240,167 -945,151 +1217,163 @@@
                                  } else {
                                      continue;
                                  }
 -                                if (columnToDelete.getViewConstant() != null) { // Disallow deletion of column referenced in WHERE clause of view
 -                                    return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
 -                                }
 -                                // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the index. If found as covered column, delete from index (do this client side?).
 -                                // In either case, invalidate index if the column is in it
 -                                for (PTable index : table.getIndexes()) {
 -                                    try {
 -                                        String indexColumnName = IndexUtil.getIndexColumnName(columnToDelete);
 -                                        PColumn indexColumn = index.getColumn(indexColumnName);
 -                                        byte[] indexKey = SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index.getTableName().getBytes());
 -                                        // If index contains the column in it's PK, then drop it
 -                                        if (SchemaUtil.isPKColumn(indexColumn)) {
 -                                            // Since we're dropping the index, lock it to ensure that a change in index state doesn't
 -                                            // occur while we're dropping it.
 -                                            acquireLock(region, indexKey, lids);
 -                                            // Drop the index table. The doDropTable will expand this to all of the table rows and invalidate the index table
 -                                            additionalTableMetaData.add(new Delete(indexKey, clientTimeStamp, null));
 -                                            byte[] linkKey = MetaDataUtil.getParentLinkKey(tenantId, schemaName, tableName, index.getTableName().getBytes());
 -                                            // Drop the link between the data table and the index table
 -                                            additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp, null));
 -                                            doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index.getTableName().getBytes(), index.getType(), additionalTableMetaData, invalidateList, lids, tableNamesToDelete);
 -                                            // TODO: return in result?
 -                                        } else {
 -                                            invalidateList.add(new ImmutableBytesPtr(indexKey));
 -                                        }
 -                                    } catch (ColumnNotFoundException e) {
 -                                    } catch (AmbiguousColumnException e) {
 -                                    }
 -                                }
++
 +                                return new MetaDataMutationResult(
 +                                        MutationCode.COLUMN_ALREADY_EXISTS, EnvironmentEdgeManager
 +                                                .currentTimeMillis(), table);
                              } catch (ColumnFamilyNotFoundException e) {
 -                                return new MetaDataMutationResult(MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
 +                                continue;
                              } catch (ColumnNotFoundException e) {
 -                                return new MetaDataMutationResult(MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
 +                                if (addingPKColumn) {
 +                                    // Add all indexes to invalidate list, as they will all be
 +                                    // adding
 +                                    // the same PK column
 +                                    // No need to lock them, as we have the parent table lock at
 +                                    // this
 +                                    // point
 +                                    for (PTable index : table.getIndexes()) {
 +                                        invalidateList.add(new ImmutableBytesPtr(SchemaUtil
 +                                                .getTableKey(tenantId, index.getSchemaName()
 +                                                        .getBytes(), index.getTableName()
 +                                                        .getBytes())));
 +                                    }
 +                                }
 +                                continue;
                              }
                          }
                      }
 +                    return null;
                  }
 -                if (deletePKColumn) {
 -                    if (table.getPKColumns().size() == 1) {
 -                        return new MetaDataMutationResult(MutationCode.NO_PK_COLUMNS, EnvironmentEdgeManager.currentTimeMillis(), null);
 -                    }
 -                }
 -                tableMetaData.addAll(additionalTableMetaData);
 -                return null;
 -            }
 -        });
 -        
 -    }
 -    
 -    private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) {
 -        byte[] startKey = region.getStartKey();
 -        byte[] endKey = region.getEndKey();
 -        if (Bytes.compareTo(startKey, key) <= 0 && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key, endKey) < 0)) {
 -            return null; // normal case;
 -        }
 -        return new MetaDataMutationResult(MutationCode.TABLE_NOT_IN_REGION, EnvironmentEdgeManager.currentTimeMillis(), null);
 -    }
 -
 -    @Override
 -    public MetaDataMutationResult getTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long tableTimeStamp, long clientTimeStamp) throws IOException {
 -        try {
 -            byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
 -            
 -            // get the co-processor environment
 -            RegionCoprocessorEnvironment env = getEnvironment();
 -            // TODO: check that key is within region.getStartKey() and region.getEndKey()
 -            // and return special code to force client to lookup region from meta.
 -            HRegion region = env.getRegion();
 -            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
 +            });
              if (result != null) {
 -                return result; 
 +                done.run(MetaDataMutationResult.toProto(result));
              }
 -            
 -            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
 -            PTable table = doGetTable(key, clientTimeStamp);
 -            if (table == null) {
 -                return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, currentTime, null);
 -            }
 -            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table.getTimeStamp() != tableTimeStamp ? table : null);
 -        } catch (Throwable t) {
 -            ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
 -            return null; // impossible
 +        } catch (IOException ioe) {
 +            ProtobufUtil.setControllerException(controller, ioe);
          }
      }
 +    
 +    @Override
 +    public void dropColumn(RpcController controller, DropColumnRequest request,
 +            RpcCallback<MetaDataResponse> done) {
 +        List<Mutation> tableMetaData = null;
  
 -    private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
 -        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
 -        Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
 -        PTable table = metaDataCache.getIfPresent(cacheKey);
 -        // We only cache the latest, so we'll end up building the table with every call if the client connection has specified an SCN.
 -        // TODO: If we indicate to the client that we're returning an older version, but there's a newer version available, the client
 -        // can safely not call this, since we only allow modifications to the latest.
 -        if (table != null && table.getTimeStamp() < clientTimeStamp) {
 -            // Table on client is up-to-date with table on server, so just return
 -            if (isTableDeleted(table)) {
 -                return null;
 -            }
 -            return table;
 -        }
 -        // Ask Lars about the expense of this call - if we don't take the lock, we still won't get partial results
 -        // get the co-processor environment
 -        RegionCoprocessorEnvironment env = getEnvironment();
 -        // TODO: check that key is within region.getStartKey() and region.getEndKey()
 -        // and return special code to force client to lookup region from meta.
 -        HRegion region = env.getRegion();
 -        /*
 -         * Lock directly on key, though it may be an index table.
 -         * This will just prevent a table from getting rebuilt
 -         * too often.
 -         */
 -        Integer lid = region.getLock(null, key, true);
 -        if (lid == null) {
 -            throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
 -        }
          try {
 -            // Try cache again in case we were waiting on a lock
 -            table = metaDataCache.getIfPresent(cacheKey);
 -            // We only cache the latest, so we'll end up building the table with every call if the client connection has specified an SCN.
 -            // TODO: If we indicate to the client that we're returning an older version, but there's a newer version available, the client
 -            // can safely not call this, since we only allow modifications to the latest.
 -            if (table != null && table.getTimeStamp() < clientTimeStamp) {
 -                // Table on client is up-to-date with table on server, so just return
 -                if (isTableDeleted(table)) {
 +            tableMetaData = ProtobufUtil.getMutations(request);
 +            final long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
 +            final List<byte[]> tableNamesToDelete = Lists.newArrayList();
 +            MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
-                 @SuppressWarnings("deprecation")
 +                @Override
 +                public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
 +                        List<Mutation> tableMetaData, HRegion region,
 +                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks)
 +                        throws IOException, SQLException {
 +                    byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
 +                    byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
 +                    byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
 +                    boolean deletePKColumn = false;
 +                    List<Mutation> additionalTableMetaData = Lists.newArrayList();
 +                    for (Mutation m : tableMetaData) {
 +                        if (m instanceof Delete) {
 +                            byte[] key = m.getRow();
 +                            int pkCount = getVarChars(key, rowKeyMetaData);
 +                            if (pkCount > COLUMN_NAME_INDEX
 +                                    && Bytes.compareTo(schemaName,
 +                                        rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
 +                                    && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
++                                PColumn columnToDelete = null;
 +                                try {
-                                     PColumn columnToDelete = null;
 +                                    if (pkCount > FAMILY_NAME_INDEX
 +                                            && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
 +                                        PColumnFamily family =
 +                                                table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
 +                                        columnToDelete =
 +                                                family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
 +                                    } else if (pkCount > COLUMN_NAME_INDEX
 +                                            && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
 +                                        deletePKColumn = true;
 +                                        columnToDelete = table.getPKColumn(new String(
 +                                          rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
 +                                    } else {
 +                                        continue;
 +                                    }
++                                    if (columnToDelete.getViewConstant() != null) { // Disallow deletion of column referenced in WHERE clause of view
++                                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
++                                    }
 +                                    // Look for columnToDelete in any indexes. If found as PK
 +                                    // column, get lock and drop the index. If found as covered
 +                                    // column, delete from index (do this client side?).
 +                                    // In either case, invalidate index if the column is in it
 +                                    for (PTable index : table.getIndexes()) {
 +                                        try {
-                                             String indexColumnName =
-                                                     IndexUtil.getIndexColumnName(columnToDelete);
++                                            String indexColumnName = IndexUtil.getIndexColumnName(columnToDelete);
 +                                            PColumn indexColumn = index.getColumn(indexColumnName);
 +                                            byte[] indexKey =
 +                                                    SchemaUtil.getTableKey(tenantId, index
-                                                             .getSchemaName().getBytes(), index
-                                                             .getTableName().getBytes());
++                                                            .getSchemaName().getBytes(), index.getTableName().getBytes());
 +                                            // If index contains the column in it's PK, then drop it
 +                                            if (SchemaUtil.isPKColumn(indexColumn)) {
 +                                                // Since we're dropping the index, lock it to ensure
 +                                                // that a change in index state doesn't
 +                                                // occur while we're dropping it.
 +                                                acquireLock(region, indexKey, locks);
 +                                                // Drop the index table. The doDropTable will expand
 +                                                // this to all of the table rows and invalidate the
 +                                                // index table
-                                                 additionalTableMetaData.add(new Delete(indexKey,
-                                                         clientTimeStamp));
-                                                 byte[] linkKey =
-                                                         MetaDataUtil.getParentLinkKey(tenantId,
-                                                             schemaName, tableName, index
-                                                                     .getTableName().getBytes());
++                                                additionalTableMetaData.add(new Delete(indexKey, clientTimeStamp));
++                                                byte[] linkKey = MetaDataUtil.getParentLinkKey(tenantId,
++                                                            schemaName, tableName, index.getTableName().getBytes());
 +                                                // Drop the link between the data table and the
 +                                                // index table
-                                                 additionalTableMetaData.add(new Delete(linkKey,
-                                                         clientTimeStamp));
-                                                 doDropTable(indexKey, tenantId, index
-                                                         .getSchemaName().getBytes(), index
-                                                         .getTableName().getBytes(),
-                                                     index.getType(), additionalTableMetaData,
-                                                     invalidateList, locks, tableNamesToDelete);
++                                                additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp));
++                                                doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index.getTableName().getBytes(),
++                                                    index.getType(), additionalTableMetaData, invalidateList, locks, tableNamesToDelete);
 +                                                // TODO: return in result?
 +                                            } else {
 +                                                invalidateList.add(new ImmutableBytesPtr(indexKey));
 +                                            }
 +                                        } catch (ColumnNotFoundException e) {
 +                                        } catch (AmbiguousColumnException e) {
 +                                        }
 +                                    }
 +                                } catch (ColumnFamilyNotFoundException e) {
 +                                    return new MetaDataMutationResult(
 +                                            MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
-                                                     .currentTimeMillis(), table);
++                                                    .currentTimeMillis(), table, columnToDelete);
 +                                } catch (ColumnNotFoundException e) {
 +                                    return new MetaDataMutationResult(
 +                                            MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
-                                                     .currentTimeMillis(), table);
++                                                    .currentTimeMillis(), table, columnToDelete);
 +                                }
 +                            }
 +                        }
 +                    }
 +                    if (deletePKColumn) {
 +                        if (table.getPKColumns().size() == 1) {
 +                            return new MetaDataMutationResult(MutationCode.NO_PK_COLUMNS,
 +                                    EnvironmentEdgeManager.currentTimeMillis(), null);
 +                        }
 +                    }
 +                    tableMetaData.addAll(additionalTableMetaData);
                      return null;
                  }
 -                return table;
 -            }
 -            // Query for the latest table first, since it's not cached
 -            table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
 -            if (table != null && table.getTimeStamp() < clientTimeStamp) {
 -                return table;
 +            });
 +            if (result != null) {
 +                done.run(MetaDataMutationResult.toProto(result));
              }
 -            // Otherwise, query for an older version of the table - it won't be cached 
 -            return buildTable(key, cacheKey, region, clientTimeStamp);
 -        } finally {
 -            if (lid != null) region.releaseRowLock(lid);
 +        } catch (IOException ioe) {
 +            ProtobufUtil.setControllerException(controller, ioe);
          }
      }
  
      @Override
 -    public void clearCache() {
 -        GlobalCache cache = GlobalCache.getInstance(this.getEnvironment());
 -        Cache<ImmutableBytesPtr,PTable> metaDataCache = cache.getMetaDataCache();
 +    public void clearCache(RpcController controller, ClearCacheRequest request,
 +            RpcCallback<ClearCacheResponse> done) {
-         Map<ImmutableBytesPtr, PTable> metaDataCache =
++        GlobalCache cache = GlobalCache.getInstance(this.env);
++        Cache<ImmutableBytesPtr, PTable> metaDataCache =
 +                GlobalCache.getInstance(this.env).getMetaDataCache();
-         metaDataCache.clear();
+         metaDataCache.invalidateAll();
+         cache.clearTenantCache();
      }
  
      @Override
@@@ -1491,37 -1159,28 +1464,36 @@@
                  if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) {
                      timeStamp = currentStateKV.getTimestamp();
                  }
 -                if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE) || (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) {
 +                if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE)
 +                        || (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) {
                      newState = PIndexState.INACTIVE;
 -                    newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
 +                    newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
 +                        INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
                  } else if (currentState == PIndexState.INACTIVE && newState == PIndexState.USABLE) {
                      newState = PIndexState.ACTIVE;
 -                    newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
 +                    newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
 +                        INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
                  }
                  if (currentState != newState) {
 -                    region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
 +                    region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                      // Invalidate from cache
-                     Map<ImmutableBytesPtr, PTable> metaDataCache =
-                             GlobalCache.getInstance(this.env).getMetaDataCache();
-                     metaDataCache.remove(cacheKey);
 -                    Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
++                    Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+                     metaDataCache.invalidate(cacheKey);
                  }
 -                // Get client timeStamp from mutations, since it may get updated by the mutateRowsWithLocks call
 +                // Get client timeStamp from mutations, since it may get updated by the
 +                // mutateRowsWithLocks call
                  long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
 -                return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null);
 +                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
 +                builder.setMutationTime(currentTime);
 +                done.run(builder.build());
 +                return;
              } finally {
 -                region.releaseRowLock(lid);
 +                rowLock.release();
              }
          } catch (Throwable t) {
 -            ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
 -            return null; // impossible
 +          logger.error("updateIndexState failed", t);
 +            ProtobufUtil.setControllerException(controller,
 +                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
          }
      }
      

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/b3a330ca/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------


Mime
View raw message