phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject [37/50] [abbrv] phoenix git commit: PHOENIX-2111 Race condition on creation of new view and adding of column to base table
Date Mon, 20 Jul 2015 17:15:07 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index feb5989..52b038b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -18,6 +18,9 @@
 package org.apache.phoenix.query;
 
 import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
@@ -110,6 +113,7 @@ import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -966,7 +970,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                             BlockingRpcCallback<GetVersionResponse> rpcCallback =
                                     new BlockingRpcCallback<GetVersionResponse>();
                             GetVersionRequest.Builder builder = GetVersionRequest.newBuilder();
-
+                            builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                             instance.getVersion(controller, builder.build(), rpcCallback);
                             if(controller.getFailedOn() != null) {
                                 throw controller.getFailedOn();
@@ -1265,6 +1269,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                             MutationProto mp = ProtobufUtil.toProto(m);
                             builder.addTableMetadataMutations(mp.toByteString());
                         }
+                        builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                         instance.createTable(controller, builder.build(), rpcCallback);
                         if(controller.getFailedOn() != null) {
                             throw controller.getFailedOn();
@@ -1293,12 +1298,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                     builder.setTableName(ByteStringer.wrap(tableBytes));
                     builder.setTableTimestamp(tableTimestamp);
                     builder.setClientTimestamp(clientTimestamp);
-
-                   instance.getTable(controller, builder.build(), rpcCallback);
-                   if(controller.getFailedOn() != null) {
-                       throw controller.getFailedOn();
-                   }
-                   return rpcCallback.get();
+                    builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
+                    instance.getTable(controller, builder.build(), rpcCallback);
+                    if(controller.getFailedOn() != null) {
+                        throw controller.getFailedOn();
+                    }
+                    return rpcCallback.get();
                 }
             });
     }
@@ -1325,7 +1330,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                         }
                         builder.setTableType(tableType.getSerializedValue());
                         builder.setCascade(cascade);
-
+                        builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                         instance.dropTable(controller, builder.build(), rpcCallback);
                         if(controller.getFailedOn() != null) {
                             throw controller.getFailedOn();
@@ -1379,6 +1384,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                             builder.addTableMetadataMutations(mp.toByteString());
                         }
                         builder.setIfExists(ifExists);
+                        builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                         instance.dropFunction(controller, builder.build(), rpcCallback);
                         if(controller.getFailedOn() != null) {
                             throw controller.getFailedOn();
@@ -1553,7 +1559,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                         MutationProto mp = ProtobufUtil.toProto(m);
                         builder.addTableMetadataMutations(mp.toByteString());
                     }
-
+                    builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                     instance.addColumn(controller, builder.build(), rpcCallback);
                     if(controller.getFailedOn() != null) {
                         throw controller.getFailedOn();
@@ -1804,6 +1810,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                         MutationProto mp = ProtobufUtil.toProto(m);
                         builder.addTableMetadataMutations(mp.toByteString());
                     }
+                    builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                     instance.dropColumn(controller, builder.build(), rpcCallback);
                     if(controller.getFailedOn() != null) {
                         throw controller.getFailedOn();
@@ -2125,6 +2132,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                         BlockingRpcCallback<ClearCacheResponse> rpcCallback =
                                 new BlockingRpcCallback<ClearCacheResponse>();
                         ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
+                        builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                         instance.clearCache(controller, builder.build(), rpcCallback);
                         if(controller.getFailedOn() != null) {
                             throw controller.getFailedOn();
@@ -2189,23 +2197,24 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
         byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX],
rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
         return metaDataCoprocessorExec(tableKey,
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
-                    @Override
-                    public MetaDataResponse call(MetaDataService instance) throws IOException
{
-                        ServerRpcController controller = new ServerRpcController();
-                        BlockingRpcCallback<MetaDataResponse> rpcCallback =
-                                new BlockingRpcCallback<MetaDataResponse>();
-                        UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
-                        for (Mutation m : tableMetaData) {
-                            MutationProto mp = ProtobufUtil.toProto(m);
-                            builder.addTableMetadataMutations(mp.toByteString());
-                        }
-                        instance.updateIndexState(controller, builder.build(), rpcCallback);
-                        if(controller.getFailedOn() != null) {
-                            throw controller.getFailedOn();
-                        }
-                        return rpcCallback.get();
-                    }
-                });
+            @Override
+            public MetaDataResponse call(MetaDataService instance) throws IOException {
+                ServerRpcController controller = new ServerRpcController();
+                BlockingRpcCallback<MetaDataResponse> rpcCallback =
+                        new BlockingRpcCallback<MetaDataResponse>();
+                UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
+                for (Mutation m : tableMetaData) {
+                    MutationProto mp = ProtobufUtil.toProto(m);
+                    builder.addTableMetadataMutations(mp.toByteString());
+                }
+                builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
+                instance.updateIndexState(controller, builder.build(), rpcCallback);
+                if(controller.getFailedOn() != null) {
+                    throw controller.getFailedOn();
+                }
+                return rpcCallback.get();
+            }
+        });
     }
 
     @Override
@@ -2408,6 +2417,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                                 builder.setTableName(ByteStringer.wrap(tableName));
                                 builder.setSchemaName(ByteStringer.wrap(schemaName));
                                 builder.setClientTimestamp(clientTS);
+                                builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                                 instance.clearTableFromCache(controller, builder.build(),
rpcCallback);
                                 if (controller.getFailedOn() != null) { throw controller.getFailedOn();
}
                                 return rpcCallback.get();
@@ -2696,12 +2706,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                         builder.addFunctionTimestamps(function.getSecond().longValue());
                     }
                     builder.setClientTimestamp(clientTimestamp);
-
-                   instance.getFunctions(controller, builder.build(), rpcCallback);
-                   if(controller.getFailedOn() != null) {
-                       throw controller.getFailedOn();
-                   }
-                   return rpcCallback.get();
+                    builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
+                    instance.getFunctions(controller, builder.build(), rpcCallback);
+                    if(controller.getFailedOn() != null) {
+                        throw controller.getFailedOn();
+                    }
+                    return rpcCallback.get();
                 }
             }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
 
@@ -2732,6 +2742,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                         }
                         builder.setTemporary(temporary);
                         builder.setReplace(function.isReplace());
+                        builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                         instance.createFunction(controller, builder.build(), rpcCallback);
                         if(controller.getFailedOn() != null) {
                             throw controller.getFailedOn();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index b1fcf30..f74133a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -234,8 +234,9 @@ public class MetaDataClient {
             TABLE_SCHEM + "," +
             TABLE_NAME + "," +
             COLUMN_FAMILY + "," +
-            LINK_TYPE +
-            ") VALUES (?, ?, ?, ?, ?)";
+            LINK_TYPE + "," +
+            TABLE_SEQ_NUM + // this is actually set to the parent table's sequence number
+            ") VALUES (?, ?, ?, ?, ?, ?)";
     private static final String CREATE_VIEW_LINK =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"(
" +
             TENANT_ID + "," +
@@ -1490,6 +1491,7 @@ public class MetaDataClient {
                 linkStatement.setString(3, parentTableName);
                 linkStatement.setString(4, tableName);
                 linkStatement.setByte(5, LinkType.INDEX_TABLE.getSerializedValue());
+                linkStatement.setLong(6, parent.getSequenceNumber());
                 linkStatement.execute();
             }
 
@@ -1649,6 +1651,12 @@ public class MetaDataClient {
                         linkStatement.setString(3, tableName);
                         linkStatement.setString(4, physicalName.getString());
                         linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue());
+                        if (tableType == PTableType.VIEW) {
+                            PTable physicalTable = connection.getMetaDataCache().getTable(new
PTableKey(null, physicalName.getString()));
+                            linkStatement.setLong(6, physicalTable.getSequenceNumber());
+                        } else {
+                            linkStatement.setLong(6, parent.getSequenceNumber());
+                        }
                         linkStatement.execute();
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index fbc15be..ee73a58 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -51,6 +51,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SortOrder;
@@ -59,6 +60,7 @@ import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -437,4 +439,16 @@ public class MetaDataUtil {
         scan.setStopRow(stopKey);
         return scan;
     }
+    
+    public static LinkType getLinkType(Mutation tableMutation) {
+        List<Cell> kvs = tableMutation.getFamilyCellMap().get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES);
+        if (kvs != null) {
+            for (Cell kv : kvs) {
+                if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
PhoenixDatabaseMetaData.LINK_TYPE_BYTES, 0, PhoenixDatabaseMetaData.LINK_TYPE_BYTES.length)
== 0) {
+                    return LinkType.fromSerializedValue(PUnsignedTinyint.INSTANCE.getCodec().decodeByte(kv.getValueArray(),
kv.getValueOffset(), SortOrder.getDefault()));
+                }
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 61642bc..1a2019d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -83,8 +83,6 @@ import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.types.PDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -99,8 +97,6 @@ import com.google.common.collect.Lists;
  * @since 0.1
  */
 public class PhoenixRuntime {
-    private static final Logger logger = LoggerFactory.getLogger(PhoenixRuntime.class);
-
     /**
      * Use this connection property to control HBase timestamps
      * by specifying your own long timestamp value at connection time. All

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 0ad6b9d..e1e2515 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -466,8 +466,8 @@ public class UpgradeUtil {
     public static void upgradeTo4_5_0(PhoenixConnection oldMetaConnection) throws SQLException
{
         PhoenixConnection metaConnection = null;
         try {
-            // Need to use own connection without any SCN to be able to read all data from
SYSTEM.CATALOG 
-            metaConnection = new PhoenixConnection(oldMetaConnection);
+            // Need to use own connection with max time stamp to be able to read all data
from SYSTEM.CATALOG 
+            metaConnection = new PhoenixConnection(oldMetaConnection, HConstants.LATEST_TIMESTAMP);
             logger.info("Upgrading metadata to support adding columns to tables with views");
             String getBaseTableAndViews = "SELECT "
                     + COLUMN_FAMILY + " AS BASE_PHYSICAL_TABLE, "

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-protocol/src/main/MetaDataService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto
index e79f846..c265158 100644
--- a/phoenix-protocol/src/main/MetaDataService.proto
+++ b/phoenix-protocol/src/main/MetaDataService.proto
@@ -61,6 +61,7 @@ message GetTableRequest {
   required bytes tableName = 3;
   required int64 tableTimestamp = 4;
   required int64 clientTimestamp = 5;
+  optional int32 clientVersion = 6;
 }
 
 message GetFunctionsRequest {
@@ -68,11 +69,13 @@ message GetFunctionsRequest {
   repeated bytes functionNames = 2;
   repeated int64 functionTimestamps = 3;
   required int64 clientTimestamp = 4;
+  optional int32 clientVersion = 5;
 }
 
 // each byte array represents a MutationProto instance
 message CreateTableRequest {
-  repeated bytes tableMetadataMutations = 1; 
+  repeated bytes tableMetadataMutations = 1;
+  optional int32 clientVersion = 2;
 }
 
 // each byte array represents a MutationProto instance
@@ -80,38 +83,46 @@ message CreateFunctionRequest {
   repeated bytes tableMetadataMutations = 1;
   required bool temporary = 2;
   optional bool replace = 3;
+  optional int32 clientVersion = 4;
 }
 
 message DropTableRequest {
   repeated bytes tableMetadataMutations = 1;
   required string tableType = 2;
   optional bool cascade = 3;
+  optional int32 clientVersion = 4;
 }
 
 message AddColumnRequest {
   repeated bytes tableMetadataMutations = 1;
+  optional int32 clientVersion = 2;
 }
 
 message DropColumnRequest {
   repeated bytes tableMetadataMutations = 1;
+  optional int32 clientVersion = 2;
 }
 
 message DropFunctionRequest {
   repeated bytes tableMetadataMutations = 1;
   optional bool ifExists = 2;
+  optional int32 clientVersion = 3;
 }
 
 message UpdateIndexStateRequest {
   repeated bytes tableMetadataMutations = 1;
+  optional int32 clientVersion = 2;
 }
 
 message ClearCacheRequest {
+  optional int32 clientVersion = 1;
 }
 
 message ClearCacheResponse {
 }
 
 message GetVersionRequest {
+  optional int32 clientVersion = 1;
 }
 
 message GetVersionResponse {
@@ -123,6 +134,7 @@ message ClearTableFromCacheRequest {
   required bytes schemaName  = 2;
   required bytes tableName = 3;
   required int64 clientTimestamp = 4;
+  optional int32 clientVersion = 5;
 }
 
 message ClearTableFromCacheResponse {


Mime
View raw message