cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stefa...@apache.org
Subject [08/11] cassandra git commit: Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum
Date Mon, 31 Oct 2016 13:38:25 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
index cd181cc..176ab84 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
@@ -143,7 +144,7 @@ public class UserType extends TupleType
         return ShortType.instance;
     }
 
-    public ByteBuffer serializeForNativeProtocol(Iterator<Cell> cells, int protocolVersion)
+    public ByteBuffer serializeForNativeProtocol(Iterator<Cell> cells, ProtocolVersion protocolVersion)
     {
         assert isMultiCell;
 
@@ -249,7 +250,7 @@ public class UserType extends TupleType
     }
 
     @Override
-    public String toJSONString(ByteBuffer buffer, int protocolVersion)
+    public String toJSONString(ByteBuffer buffer, ProtocolVersion protocolVersion)
     {
         ByteBuffer[] buffers = split(buffer);
         StringBuilder sb = new StringBuilder("{");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index f1ee3c1..2dffe58 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -856,7 +856,7 @@ public final class SchemaKeyspace
                .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null)
                .add("initcond", aggregate.initialCondition() != null
                                 // must use the frozen state type here, as 'null' for unfrozen collections may mean 'empty'
-                                ? aggregate.stateType().freeze().asCQL3Type().toCQLLiteral(aggregate.initialCondition(), Server.CURRENT_VERSION)
+                                ? aggregate.stateType().freeze().asCQL3Type().toCQLLiteral(aggregate.initialCondition(), ProtocolVersion.CURRENT)
                                 : null);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 3d6be67..95a0388 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class CollectionSerializer<T> implements TypeSerializer<T>
@@ -30,14 +30,14 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
     protected abstract List<ByteBuffer> serializeValues(T value);
     protected abstract int getElementCount(T value);
 
-    public abstract T deserializeForNativeProtocol(ByteBuffer buffer, int version);
-    public abstract void validateForNativeProtocol(ByteBuffer buffer, int version);
+    public abstract T deserializeForNativeProtocol(ByteBuffer buffer, ProtocolVersion version);
+    public abstract void validateForNativeProtocol(ByteBuffer buffer, ProtocolVersion version);
 
     public ByteBuffer serialize(T value)
     {
         List<ByteBuffer> values = serializeValues(value);
         // See deserialize() for why using the protocol v3 variant is the right thing to do.
-        return pack(values, getElementCount(value), Server.VERSION_3);
+        return pack(values, getElementCount(value), ProtocolVersion.V3);
     }
 
     public T deserialize(ByteBuffer bytes)
@@ -47,16 +47,16 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
         //  1) when collections are frozen
         //  2) for internal calls.
         // In both case, using the protocol 3 version variant is the right thing to do.
-        return deserializeForNativeProtocol(bytes, Server.VERSION_3);
+        return deserializeForNativeProtocol(bytes, ProtocolVersion.V3);
     }
 
     public void validate(ByteBuffer bytes) throws MarshalException
     {
         // Same thing as above
-        validateForNativeProtocol(bytes, Server.VERSION_3);
+        validateForNativeProtocol(bytes, ProtocolVersion.V3);
     }
 
-    public static ByteBuffer pack(Collection<ByteBuffer> buffers, int elements, int version)
+    public static ByteBuffer pack(Collection<ByteBuffer> buffers, int elements, ProtocolVersion version)
     {
         int size = 0;
         for (ByteBuffer bb : buffers)
@@ -69,22 +69,22 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
         return (ByteBuffer)result.flip();
     }
 
-    protected static void writeCollectionSize(ByteBuffer output, int elements, int version)
+    protected static void writeCollectionSize(ByteBuffer output, int elements, ProtocolVersion version)
     {
             output.putInt(elements);
     }
 
-    public static int readCollectionSize(ByteBuffer input, int version)
+    public static int readCollectionSize(ByteBuffer input, ProtocolVersion version)
     {
         return input.getInt();
     }
 
-    protected static int sizeOfCollectionSize(int elements, int version)
+    protected static int sizeOfCollectionSize(int elements, ProtocolVersion version)
     {
         return 4;
     }
 
-    public static void writeValue(ByteBuffer output, ByteBuffer value, int version)
+    public static void writeValue(ByteBuffer output, ByteBuffer value, ProtocolVersion version)
     {
         if (value == null)
         {
@@ -96,7 +96,7 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
         output.put(value.duplicate());
     }
 
-    public static ByteBuffer readValue(ByteBuffer input, int version)
+    public static ByteBuffer readValue(ByteBuffer input, ProtocolVersion version)
     {
         int size = input.getInt();
         if (size < 0)
@@ -105,7 +105,7 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
         return ByteBufferUtil.readBytes(input, size);
     }
 
-    public static int sizeOfValue(ByteBuffer value, int version)
+    public static int sizeOfValue(ByteBuffer value, ProtocolVersion version)
     {
         return value == null ? 4 : 4 + value.remaining();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index 3fd0803..44c33a6 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -18,7 +18,7 @@
 
 package org.apache.cassandra.serializers;
 
-import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
@@ -60,7 +60,7 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
         return value.size();
     }
 
-    public void validateForNativeProtocol(ByteBuffer bytes, int version)
+    public void validateForNativeProtocol(ByteBuffer bytes, ProtocolVersion version)
     {
         try
         {
@@ -78,7 +78,7 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
         }
     }
 
-    public List<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
+    public List<T> deserializeForNativeProtocol(ByteBuffer bytes, ProtocolVersion version)
     {
         try
         {
@@ -130,7 +130,7 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
         try
         {
             ByteBuffer input = serializedList.duplicate();
-            int n = readCollectionSize(input, Server.VERSION_3);
+            int n = readCollectionSize(input, ProtocolVersion.V3);
             if (n <= index)
                 return null;
 
@@ -139,7 +139,7 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
                 int length = input.getInt();
                 input.position(input.position() + length);
             }
-            return readValue(input, Server.VERSION_3);
+            return readValue(input, ProtocolVersion.V3);
         }
         catch (BufferUnderflowException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index 67e5637..1722832 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.Pair;
 
 public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
@@ -74,7 +74,7 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
         return value.size();
     }
 
-    public void validateForNativeProtocol(ByteBuffer bytes, int version)
+    public void validateForNativeProtocol(ByteBuffer bytes, ProtocolVersion version)
     {
         try
         {
@@ -94,7 +94,7 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
         }
     }
 
-    public Map<K, V> deserializeForNativeProtocol(ByteBuffer bytes, int version)
+    public Map<K, V> deserializeForNativeProtocol(ByteBuffer bytes, ProtocolVersion version)
     {
         try
         {
@@ -141,11 +141,11 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
         try
         {
             ByteBuffer input = serializedMap.duplicate();
-            int n = readCollectionSize(input, Server.VERSION_3);
+            int n = readCollectionSize(input, ProtocolVersion.V3);
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer kbb = readValue(input, Server.VERSION_3);
-                ByteBuffer vbb = readValue(input, Server.VERSION_3);
+                ByteBuffer kbb = readValue(input, ProtocolVersion.V3);
+                ByteBuffer vbb = readValue(input, ProtocolVersion.V3);
                 int comparison = keyType.compare(kbb, serializedKey);
                 if (comparison == 0)
                     return vbb;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/serializers/SetSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java
index da7744b..a440234 100644
--- a/src/java/org/apache/cassandra/serializers/SetSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java
@@ -22,6 +22,8 @@ import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.transport.ProtocolVersion;
+
 public class SetSerializer<T> extends CollectionSerializer<Set<T>>
 {
     // interning instances
@@ -61,7 +63,7 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
         return value.size();
     }
 
-    public void validateForNativeProtocol(ByteBuffer bytes, int version)
+    public void validateForNativeProtocol(ByteBuffer bytes, ProtocolVersion version)
     {
         try
         {
@@ -78,7 +80,7 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
         }
     }
 
-    public Set<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
+    public Set<T> deserializeForNativeProtocol(ByteBuffer bytes, ProtocolVersion version)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4354c32..07eb1d8 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -91,6 +91,7 @@ import org.apache.cassandra.thrift.EndpointDetails;
 import org.apache.cassandra.thrift.TokenRange;
 import org.apache.cassandra.thrift.cassandraConstants;
 import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventType;
@@ -602,7 +603,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString());
         logger.info("Thrift API version: {}", cassandraConstants.VERSION);
         logger.info("CQL supported versions: {} (default: {})",
-                StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
+                StringUtils.join(ClientState.getCQLSupportedVersion(), ", "), ClientState.DEFAULT_CQL_VERSION);
+        logger.info("Native protocol supported versions: {} (default: {})",
+                    StringUtils.join(ProtocolVersion.supportedVersions(), ", "), ProtocolVersion.CURRENT);
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index d9b3632..22ddc84 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -23,12 +23,13 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 abstract class AbstractQueryPager implements QueryPager
 {
     protected final ReadCommand command;
     protected final DataLimits limits;
-    protected final int protocolVersion;
+    protected final ProtocolVersion protocolVersion;
 
     private int remaining;
 
@@ -40,7 +41,7 @@ abstract class AbstractQueryPager implements QueryPager
 
     private boolean exhausted;
 
-    protected AbstractQueryPager(ReadCommand command, int protocolVersion)
+    protected AbstractQueryPager(ReadCommand command, ProtocolVersion protocolVersion)
     {
         this.command = command;
         this.protocolVersion = protocolVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 75cc71f..da388d0 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service.pager;
 
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.AbstractIterator;
 
 import java.util.Arrays;
@@ -53,7 +54,7 @@ public class MultiPartitionPager implements QueryPager
     private int remaining;
     private int current;
 
-    public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, int protocolVersion)
+    public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, ProtocolVersion protocolVersion)
     {
         this.limit = group.limits();
         this.nowInSec = group.nowInSec();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index 30e14c3..4a9ac39 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -52,7 +52,7 @@ public class PagingState
         this.remainingInPartition = remainingInPartition;
     }
 
-    public static PagingState deserialize(ByteBuffer bytes, int protocolVersion)
+    public static PagingState deserialize(ByteBuffer bytes, ProtocolVersion protocolVersion)
     {
         if (bytes == null)
             return null;
@@ -62,7 +62,7 @@ public class PagingState
             ByteBuffer pk;
             RowMark mark;
             int remaining, remainingInPartition;
-            if (protocolVersion <= Server.VERSION_3)
+            if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3))
             {
                 pk = ByteBufferUtil.readWithShortLength(in);
                 mark = new RowMark(ByteBufferUtil.readWithShortLength(in), protocolVersion);
@@ -91,14 +91,14 @@ public class PagingState
         }
     }
 
-    public ByteBuffer serialize(int protocolVersion)
+    public ByteBuffer serialize(ProtocolVersion protocolVersion)
     {
         assert rowMark == null || protocolVersion == rowMark.protocolVersion;
         try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize(protocolVersion)))
         {
             ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
             ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark;
-            if (protocolVersion <= Server.VERSION_3)
+            if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3))
             {
                 ByteBufferUtil.writeWithShortLength(pk, out);
                 ByteBufferUtil.writeWithShortLength(mark, out);
@@ -120,12 +120,12 @@ public class PagingState
         }
     }
 
-    public int serializedSize(int protocolVersion)
+    public int serializedSize(ProtocolVersion protocolVersion)
     {
         assert rowMark == null || protocolVersion == rowMark.protocolVersion;
         ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
         ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark;
-        if (protocolVersion <= Server.VERSION_3)
+        if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3))
         {
             return ByteBufferUtil.serializedSizeWithShortLength(pk)
                  + ByteBufferUtil.serializedSizeWithShortLength(mark)
@@ -180,9 +180,9 @@ public class PagingState
     {
         // This can be null for convenience if no row is marked.
         private final ByteBuffer mark;
-        private final int protocolVersion;
+        private final ProtocolVersion protocolVersion;
 
-        private RowMark(ByteBuffer mark, int protocolVersion)
+        private RowMark(ByteBuffer mark, ProtocolVersion protocolVersion)
         {
             this.mark = mark;
             this.protocolVersion = protocolVersion;
@@ -202,10 +202,10 @@ public class PagingState
             return l;
         }
 
-        public static RowMark create(CFMetaData metadata, Row row, int protocolVersion)
+        public static RowMark create(CFMetaData metadata, Row row, ProtocolVersion protocolVersion)
         {
             ByteBuffer mark;
-            if (protocolVersion <= Server.VERSION_3)
+            if (protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3))
             {
                 // We need to be backward compatible with 2.1/2.2 nodes paging states. Which means we have to send
                 // the full cellname of the "last" cell in the row we get (since that's how 2.1/2.2 nodes will start after
@@ -238,7 +238,7 @@ public class PagingState
             if (mark == null)
                 return null;
 
-            return protocolVersion <= Server.VERSION_3
+            return protocolVersion.isSmallerOrEqualTo(ProtocolVersion.V3)
                  ? LegacyLayout.decodeClustering(metadata, mark)
                  : Clustering.serializer.deserialize(mark, MessagingService.VERSION_30, makeClusteringTypes(metadata));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index 5a7cccf..5ba13a4 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 /**
  * Pages a PartitionRangeReadCommand.
@@ -38,7 +39,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
     private volatile DecoratedKey lastReturnedKey;
     private volatile PagingState.RowMark lastReturnedRow;
 
-    public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion)
+    public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, ProtocolVersion protocolVersion)
     {
         super(command, protocolVersion);
 
@@ -51,7 +52,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
     }
 
     public PartitionRangeQueryPager(ReadCommand command,
-                                    int protocolVersion,
+                                    ProtocolVersion protocolVersion,
                                     DecoratedKey lastReturnedKey,
                                     PagingState.RowMark lastReturnedRow,
                                     int remaining,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 7fb4e70..15311ab 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 /**
  * Static utility methods for paging.
@@ -49,7 +49,7 @@ public class QueryPagers
                                  long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
         SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
-        final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION);
+        final SinglePartitionPager pager = new SinglePartitionPager(command, null, ProtocolVersion.CURRENT);
 
         int count = 0;
         while (!pager.isExhausted())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 59b2a51..e400fb6 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 /**
  * Common interface to single partition queries (by slice and by name).
@@ -34,7 +35,7 @@ public class SinglePartitionPager extends AbstractQueryPager
 
     private volatile PagingState.RowMark lastReturned;
 
-    public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, int protocolVersion)
+    public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, ProtocolVersion protocolVersion)
     {
         super(command, protocolVersion);
         this.command = command;
@@ -47,7 +48,7 @@ public class SinglePartitionPager extends AbstractQueryPager
     }
 
     private SinglePartitionPager(SinglePartitionReadCommand command,
-                                 int protocolVersion,
+                                 ProtocolVersion protocolVersion,
                                  PagingState.RowMark rowMark,
                                  int remaining,
                                  int remainingInPartition)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
index 0ef619e..9b0847b 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf;
 
 public interface CBCodec<T>
 {
-    public T decode(ByteBuf body, int version);
-    public void encode(T t, ByteBuf dest, int version);
-    public int encodedSize(T t, int version);
+    public T decode(ByteBuf body, ProtocolVersion version);
+    public void encode(T t, ByteBuf dest, ProtocolVersion version);
+    public int encodedSize(T t, ProtocolVersion version);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 91b9cc7..66e5e73 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -391,12 +391,12 @@ public abstract class CBUtil
         return ByteBuffer.wrap(readRawBytes(slice));
     }
 
-    public static ByteBuffer readBoundValue(ByteBuf cb, int protocolVersion)
+    public static ByteBuffer readBoundValue(ByteBuf cb, ProtocolVersion protocolVersion)
     {
         int length = cb.readInt();
         if (length < 0)
         {
-            if (protocolVersion < Server.VERSION_4) // backward compatibility for pre-version 4
+            if (protocolVersion.isSmallerThan(ProtocolVersion.V4)) // backward compatibility for pre-version 4
                 return null;
             if (length == -1)
                 return null;
@@ -454,7 +454,7 @@ public abstract class CBUtil
         return 4 + (valueSize < 0 ? 0 : valueSize);
     }
 
-    public static List<ByteBuffer> readValueList(ByteBuf cb, int protocolVersion)
+    public static List<ByteBuffer> readValueList(ByteBuf cb, ProtocolVersion protocolVersion)
     {
         int size = cb.readUnsignedShort();
         if (size == 0)
@@ -481,7 +481,7 @@ public abstract class CBUtil
         return size;
     }
 
-    public static Pair<List<String>, List<ByteBuffer>> readNameAndValueList(ByteBuf cb, int protocolVersion)
+    public static Pair<List<String>, List<ByteBuffer>> readNameAndValueList(ByteBuf cb, ProtocolVersion protocolVersion)
     {
         int size = cb.readUnsignedShort();
         if (size == 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index f6216e1..e428b06 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -43,7 +43,7 @@ public class Client extends SimpleClient
 {
     private final SimpleEventHandler eventHandler = new SimpleEventHandler();
 
-    public Client(String host, int port, int version, ClientEncryptionOptions encryptionOptions)
+    public Client(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions)
     {
         super(host, port, version, encryptionOptions);
         setEventHandler(eventHandler);
@@ -136,7 +136,7 @@ public class Client extends SimpleClient
                     return null;
                 }
             }
-            return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
+            return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null, version));
         }
         else if (msgType.equals("PREPARE"))
         {
@@ -251,7 +251,7 @@ public class Client extends SimpleClient
         // Parse options.
         String host = args[0];
         int port = Integer.parseInt(args[1]);
-        int version = args.length == 3 ? Integer.parseInt(args[2]) : Server.CURRENT_VERSION;
+        ProtocolVersion version = args.length == 3 ? ProtocolVersion.decode(Integer.parseInt(args[2])) : ProtocolVersion.CURRENT;
 
         ClientEncryptionOptions encryptionOptions = new ClientEncryptionOptions();
         System.out.println("CQL binary protocol console " + host + "@" + port + " using native protocol version " + version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java
index af26557..a04a055 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -25,12 +25,12 @@ public class Connection
     static final AttributeKey<Connection> attributeKey = AttributeKey.valueOf("CONN");
 
     private final Channel channel;
-    private final int version;
+    private final ProtocolVersion version;
     private final Tracker tracker;
 
     private volatile FrameCompressor frameCompressor;
 
-    public Connection(Channel channel, int version, Tracker tracker)
+    public Connection(Channel channel, ProtocolVersion version, Tracker tracker)
     {
         this.channel = channel;
         this.version = version;
@@ -54,7 +54,7 @@ public class Connection
         return tracker;
     }
 
-    public int getVersion()
+    public ProtocolVersion getVersion()
     {
         return version;
     }
@@ -66,7 +66,7 @@ public class Connection
 
     public interface Factory
     {
-        Connection newConnection(Channel channel, int version);
+        Connection newConnection(Channel channel, ProtocolVersion version);
     }
 
     public interface Tracker

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
index eb1f1f4..aef3fa1 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -35,37 +35,37 @@ import org.apache.cassandra.utils.Pair;
 
 public enum DataType implements OptionCodec.Codecable<DataType>
 {
-    CUSTOM   (0,  null, 1),
-    ASCII    (1,  AsciiType.instance, 1),
-    BIGINT   (2,  LongType.instance, 1),
-    BLOB     (3,  BytesType.instance, 1),
-    BOOLEAN  (4,  BooleanType.instance, 1),
-    COUNTER  (5,  CounterColumnType.instance, 1),
-    DECIMAL  (6,  DecimalType.instance, 1),
-    DOUBLE   (7,  DoubleType.instance, 1),
-    FLOAT    (8,  FloatType.instance, 1),
-    INT      (9,  Int32Type.instance, 1),
-    TEXT     (10, UTF8Type.instance, 1),
-    TIMESTAMP(11, TimestampType.instance, 1),
-    UUID     (12, UUIDType.instance, 1),
-    VARCHAR  (13, UTF8Type.instance, 1),
-    VARINT   (14, IntegerType.instance, 1),
-    TIMEUUID (15, TimeUUIDType.instance, 1),
-    INET     (16, InetAddressType.instance, 1),
-    DATE     (17, SimpleDateType.instance, 4),
-    TIME     (18, TimeType.instance, 4),
-    SMALLINT (19, ShortType.instance, 4),
-    BYTE     (20, ByteType.instance, 4),
-    LIST     (32, null, 1),
-    MAP      (33, null, 1),
-    SET      (34, null, 1),
-    UDT      (48, null, 3),
-    TUPLE    (49, null, 3);
+    CUSTOM   (0,  null, ProtocolVersion.V1),
+    ASCII    (1,  AsciiType.instance, ProtocolVersion.V1),
+    BIGINT   (2,  LongType.instance, ProtocolVersion.V1),
+    BLOB     (3,  BytesType.instance, ProtocolVersion.V1),
+    BOOLEAN  (4,  BooleanType.instance, ProtocolVersion.V1),
+    COUNTER  (5,  CounterColumnType.instance, ProtocolVersion.V1),
+    DECIMAL  (6,  DecimalType.instance, ProtocolVersion.V1),
+    DOUBLE   (7,  DoubleType.instance, ProtocolVersion.V1),
+    FLOAT    (8,  FloatType.instance, ProtocolVersion.V1),
+    INT      (9,  Int32Type.instance, ProtocolVersion.V1),
+    TEXT     (10, UTF8Type.instance, ProtocolVersion.V1),
+    TIMESTAMP(11, TimestampType.instance, ProtocolVersion.V1),
+    UUID     (12, UUIDType.instance, ProtocolVersion.V1),
+    VARCHAR  (13, UTF8Type.instance, ProtocolVersion.V1),
+    VARINT   (14, IntegerType.instance, ProtocolVersion.V1),
+    TIMEUUID (15, TimeUUIDType.instance, ProtocolVersion.V1),
+    INET     (16, InetAddressType.instance, ProtocolVersion.V1),
+    DATE     (17, SimpleDateType.instance, ProtocolVersion.V4),
+    TIME     (18, TimeType.instance, ProtocolVersion.V4),
+    SMALLINT (19, ShortType.instance, ProtocolVersion.V4),
+    BYTE     (20, ByteType.instance, ProtocolVersion.V4),
+    LIST     (32, null, ProtocolVersion.V1),
+    MAP      (33, null, ProtocolVersion.V1),
+    SET      (34, null, ProtocolVersion.V1),
+    UDT      (48, null, ProtocolVersion.V3),
+    TUPLE    (49, null, ProtocolVersion.V3);
 
     public static final OptionCodec<DataType> codec = new OptionCodec<DataType>(DataType.class);
 
     private final int id;
-    private final int protocolVersion;
+    private final ProtocolVersion protocolVersion;
     private final AbstractType type;
     private final Pair<DataType, Object> pair;
     private static final Map<AbstractType, DataType> dataTypeMap = new HashMap<AbstractType, DataType>();
@@ -78,7 +78,7 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         }
     }
 
-    DataType(int id, AbstractType type, int protocolVersion)
+    DataType(int id, AbstractType type, ProtocolVersion protocolVersion)
     {
         this.id = id;
         this.type = type;
@@ -86,14 +86,14 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         pair = Pair.create(this, null);
     }
 
-    public int getId(int version)
+    public int getId(ProtocolVersion version)
     {
-        if (version < protocolVersion)
+        if (version.isSmallerThan(protocolVersion))
             return DataType.CUSTOM.getId(version);
         return id;
     }
 
-    public Object readValue(ByteBuf cb, int version)
+    public Object readValue(ByteBuf cb, ProtocolVersion version)
     {
         switch (this)
         {
@@ -131,10 +131,10 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         }
     }
 
-    public void writeValue(Object value, ByteBuf cb, int version)
+    public void writeValue(Object value, ByteBuf cb, ProtocolVersion version)
     {
         // Serialize as CUSTOM if client on the other side's version is < required for type
-        if (version < protocolVersion)
+        if (version.isSmallerThan(protocolVersion))
         {
             CBUtil.writeString(value.toString(), cb);
             return;
@@ -177,10 +177,10 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         }
     }
 
-    public int serializedValueSize(Object value, int version)
+    public int serializedValueSize(Object value, ProtocolVersion version)
     {
         // Serialize as CUSTOM if client on the other side's version is < required for type
-        if (version < protocolVersion)
+        if (version.isSmallerThan(protocolVersion))
             return CBUtil.sizeOfString(value.toString());
 
         switch (this)
@@ -219,7 +219,7 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         }
     }
 
-    public static Pair<DataType, Object> fromType(AbstractType type, int version)
+    public static Pair<DataType, Object> fromType(AbstractType type, ProtocolVersion version)
     {
         // For CQL3 clients, ReversedType is an implementation detail and they
         // shouldn't have to care about it.
@@ -251,10 +251,10 @@ public enum DataType implements OptionCodec.Codecable<DataType>
                 throw new AssertionError();
             }
 
-            if (type instanceof UserType && version >= UDT.protocolVersion)
+            if (type instanceof UserType && version.isGreaterOrEqualTo(UDT.protocolVersion))
                 return Pair.<DataType, Object>create(UDT, type);
 
-            if (type instanceof TupleType && version >= TUPLE.protocolVersion)
+            if (type instanceof TupleType && version.isGreaterOrEqualTo(TUPLE.protocolVersion))
                 return Pair.<DataType, Object>create(TUPLE, type);
 
             return Pair.<DataType, Object>create(CUSTOM, type.toString());
@@ -262,7 +262,7 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         else
         {
             // Fall back to CUSTOM if target doesn't know this data type
-            if (version < dt.protocolVersion)
+            if (version.isSmallerThan(dt.protocolVersion))
                 return Pair.<DataType, Object>create(CUSTOM, type.toString());
             return dt.pair;
         }
@@ -298,7 +298,7 @@ public enum DataType implements OptionCodec.Codecable<DataType>
     }
 
     @VisibleForTesting
-    public int getProtocolVersion()
+    public ProtocolVersion getProtocolVersion()
     {
         return protocolVersion;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 1b72cbe..ed77e59 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -29,14 +29,14 @@ public abstract class Event
 {
     public enum Type
     {
-        TOPOLOGY_CHANGE(Server.VERSION_3),
-        STATUS_CHANGE(Server.VERSION_3),
-        SCHEMA_CHANGE(Server.VERSION_3),
-        TRACE_COMPLETE(Server.VERSION_4);
+        TOPOLOGY_CHANGE(ProtocolVersion.V3),
+        STATUS_CHANGE(ProtocolVersion.V3),
+        SCHEMA_CHANGE(ProtocolVersion.V3),
+        TRACE_COMPLETE(ProtocolVersion.V4);
 
-        public final int minimumVersion;
+        public final ProtocolVersion minimumVersion;
 
-        Type(int minimumVersion)
+        Type(ProtocolVersion minimumVersion)
         {
             this.minimumVersion = minimumVersion;
         }
@@ -49,10 +49,10 @@ public abstract class Event
         this.type = type;
     }
 
-    public static Event deserialize(ByteBuf cb, int version)
+    public static Event deserialize(ByteBuf cb, ProtocolVersion version)
     {
         Type eventType = CBUtil.readEnumValue(Type.class, cb);
-        if (eventType.minimumVersion > version)
+        if (eventType.minimumVersion.isGreaterThan(version))
             throw new ProtocolException("Event " + eventType.name() + " not valid for protocol version " + version);
         switch (eventType)
         {
@@ -66,21 +66,21 @@ public abstract class Event
         throw new AssertionError();
     }
 
-    public void serialize(ByteBuf dest, int version)
+    public void serialize(ByteBuf dest, ProtocolVersion version)
     {
-        if (type.minimumVersion > version)
+        if (type.minimumVersion.isGreaterThan(version))
             throw new ProtocolException("Event " + type.name() + " not valid for protocol version " + version);
         CBUtil.writeEnumValue(type, dest);
         serializeEvent(dest, version);
     }
 
-    public int serializedSize(int version)
+    public int serializedSize(ProtocolVersion version)
     {
         return CBUtil.sizeOfEnumValue(type) + eventSerializedSize(version);
     }
 
-    protected abstract void serializeEvent(ByteBuf dest, int version);
-    protected abstract int eventSerializedSize(int version);
+    protected abstract void serializeEvent(ByteBuf dest, ProtocolVersion version);
+    protected abstract int eventSerializedSize(ProtocolVersion version);
 
     public static abstract class NodeEvent extends Event
     {
@@ -126,20 +126,20 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static TopologyChange deserializeEvent(ByteBuf cb, int version)
+        private static TopologyChange deserializeEvent(ByteBuf cb, ProtocolVersion version)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
             InetSocketAddress node = CBUtil.readInet(cb);
             return new TopologyChange(change, node);
         }
 
-        protected void serializeEvent(ByteBuf dest, int version)
+        protected void serializeEvent(ByteBuf dest, ProtocolVersion version)
         {
             CBUtil.writeEnumValue(change, dest);
             CBUtil.writeInet(node, dest);
         }
 
-        protected int eventSerializedSize(int version)
+        protected int eventSerializedSize(ProtocolVersion version)
         {
             return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node);
         }
@@ -192,20 +192,20 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static StatusChange deserializeEvent(ByteBuf cb, int version)
+        private static StatusChange deserializeEvent(ByteBuf cb, ProtocolVersion version)
         {
             Status status = CBUtil.readEnumValue(Status.class, cb);
             InetSocketAddress node = CBUtil.readInet(cb);
             return new StatusChange(status, node);
         }
 
-        protected void serializeEvent(ByteBuf dest, int version)
+        protected void serializeEvent(ByteBuf dest, ProtocolVersion version)
         {
             CBUtil.writeEnumValue(status, dest);
             CBUtil.writeInet(node, dest);
         }
 
-        protected int eventSerializedSize(int version)
+        protected int eventSerializedSize(ProtocolVersion version)
         {
             return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node);
         }
@@ -268,10 +268,10 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        public static SchemaChange deserializeEvent(ByteBuf cb, int version)
+        public static SchemaChange deserializeEvent(ByteBuf cb, ProtocolVersion version)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
-            if (version >= Server.VERSION_3)
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V3))
             {
                 Target target = CBUtil.readEnumValue(Target.class, cb);
                 String keyspace = CBUtil.readString(cb);
@@ -290,11 +290,11 @@ public abstract class Event
             }
         }
 
-        public void serializeEvent(ByteBuf dest, int version)
+        public void serializeEvent(ByteBuf dest, ProtocolVersion version)
         {
             if (target == Target.FUNCTION || target == Target.AGGREGATE)
             {
-                if (version >= Server.VERSION_4)
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V4))
                 {
                     // available since protocol version 4
                     CBUtil.writeEnumValue(change, dest);
@@ -307,7 +307,7 @@ public abstract class Event
                 {
                     // not available in protocol versions < 4 - just say the keyspace was updated.
                     CBUtil.writeEnumValue(Change.UPDATED, dest);
-                    if (version >= 3)
+                    if (version.isGreaterOrEqualTo(ProtocolVersion.V3))
                         CBUtil.writeEnumValue(Target.KEYSPACE, dest);
                     CBUtil.writeString(keyspace, dest);
                     CBUtil.writeString("", dest);
@@ -315,7 +315,7 @@ public abstract class Event
                 return;
             }
 
-            if (version >= Server.VERSION_3)
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V3))
             {
                 CBUtil.writeEnumValue(change, dest);
                 CBUtil.writeEnumValue(target, dest);
@@ -342,17 +342,17 @@ public abstract class Event
             }
         }
 
-        public int eventSerializedSize(int version)
+        public int eventSerializedSize(ProtocolVersion version)
         {
             if (target == Target.FUNCTION || target == Target.AGGREGATE)
             {
-                if (version >= Server.VERSION_4)
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V4))
                     return CBUtil.sizeOfEnumValue(change)
                                + CBUtil.sizeOfEnumValue(target)
                                + CBUtil.sizeOfString(keyspace)
                                + CBUtil.sizeOfString(name)
                                + CBUtil.sizeOfStringList(argTypes);
-                if (version >= Server.VERSION_3)
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V3))
                     return CBUtil.sizeOfEnumValue(Change.UPDATED)
                            + CBUtil.sizeOfEnumValue(Target.KEYSPACE)
                            + CBUtil.sizeOfString(keyspace);
@@ -361,7 +361,7 @@ public abstract class Event
                        + CBUtil.sizeOfString("");
             }
 
-            if (version >= Server.VERSION_3)
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V3))
             {
                 int size = CBUtil.sizeOfEnumValue(change)
                          + CBUtil.sizeOfEnumValue(target)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index f2f6174..6cd8b1e 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -67,7 +67,7 @@ public class Frame
         return body.release();
     }
 
-    public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body)
+    public static Frame create(Message.Type type, int streamId, ProtocolVersion version, EnumSet<Header.Flag> flags, ByteBuf body)
     {
         Header header = new Header(version, flags, streamId, type);
         return new Frame(header, body);
@@ -80,12 +80,12 @@ public class Frame
 
         public static final int BODY_LENGTH_SIZE = 4;
 
-        public final int version;
+        public final ProtocolVersion version;
         public final EnumSet<Flag> flags;
         public final int streamId;
         public final Message.Type type;
 
-        private Header(int version, EnumSet<Flag> flags, int streamId, Message.Type type)
+        private Header(ProtocolVersion version, EnumSet<Flag> flags, int streamId, Message.Type type)
         {
             this.version = version;
             this.flags = flags;
@@ -93,7 +93,7 @@ public class Frame
             this.type = type;
         }
 
-        public static enum Flag
+        public enum Flag
         {
             // The order of that enum matters!!
             COMPRESSED,
@@ -169,9 +169,8 @@ public class Frame
             // 1 and 2 use a shorter header, so we may never have a complete header's worth of bytes.
             int firstByte = buffer.getByte(idx++);
             Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
-            int version = firstByte & PROTOCOL_VERSION_MASK;
-            if (version < Server.MIN_SUPPORTED_VERSION)
-                throw new ProtocolException(protocolVersionExceptionMessage(version), version);
+            int versionNum = firstByte & PROTOCOL_VERSION_MASK;
+            ProtocolVersion version = ProtocolVersion.decode(versionNum);
 
             // Wait until we have the complete header
             if (readableBytes < Header.LENGTH)
@@ -179,17 +178,10 @@ public class Frame
 
             int flags = buffer.getByte(idx++);
             EnumSet<Header.Flag> decodedFlags = Header.Flag.deserialize(flags);
-            if (version > Server.CURRENT_VERSION)
-            {
-                if (version == Server.BETA_VERSION)
-                {
-                    if (!decodedFlags.contains(Header.Flag.USE_BETA))
-                        throw new ProtocolException(String.format("Beta version of the protocol used (%d), but USE_BETA flag is unset",
-                                                                  version));
-                }
-                else
-                    throw new ProtocolException(protocolVersionExceptionMessage(version));
-            }
+
+            if (version.isBeta() && !decodedFlags.contains(Header.Flag.USE_BETA))
+                throw new ProtocolException(String.format("Beta version of the protocol used (%s), but USE_BETA flag is unset", version),
+                                            version);
 
             int streamId = buffer.getShort(idx);
             idx += 2;
@@ -227,7 +219,7 @@ public class Frame
             // extract body
             ByteBuf body = buffer.slice(idx, (int) bodyLength);
             body.retain();
-            
+
             idx += bodyLength;
             buffer.readerIndex(idx);
 
@@ -243,7 +235,7 @@ public class Frame
             {
                 throw ErrorMessage.wrap(
                         new ProtocolException(String.format(
-                                "Invalid message version. Got %d but previous messages on this connection had version %d",
+                                "Invalid message version. Got %s but previous messages on this connection had version %s",
                                 version, connection.getVersion())),
                         streamId);
             }
@@ -251,12 +243,6 @@ public class Frame
             results.add(new Frame(new Header(version, decodedFlags, streamId, type), body));
         }
 
-        private static String protocolVersionExceptionMessage(int version)
-        {
-            return String.format("Invalid or unsupported protocol version (%d); the lowest supported version is %d and the greatest is %d",
-                                 version, Server.MIN_SUPPORTED_VERSION, Server.CURRENT_VERSION);
-        }
-
         private void fail()
         {
             // Reset to the initial state and throw the exception
@@ -285,12 +271,12 @@ public class Frame
             ByteBuf header = CBUtil.allocator.buffer(Header.LENGTH);
 
             Message.Type type = frame.header.type;
-            header.writeByte(type.direction.addToVersion(frame.header.version));
+            header.writeByte(type.direction.addToVersion(frame.header.version.asInt()));
             header.writeByte(Header.Flag.serialize(frame.header.flags));
 
             // Continue to support writing pre-v3 headers so that we can give proper error messages to drivers that
             // connect with the v1/v2 protocol. See CASSANDRA-11464.
-            if (frame.header.version >= Server.VERSION_3)
+            if (frame.header.version.isGreaterOrEqualTo(ProtocolVersion.V3))
                 header.writeShort(frame.header.streamId);
             else
                 header.writeByte(frame.header.streamId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 6f3b0f8..fc8cd93 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -121,7 +121,7 @@ public abstract class Message
             }
         }
 
-        private Type(int opcode, Direction direction, Codec<?> codec)
+        Type(int opcode, Direction direction, Codec<?> codec)
         {
             this.opcode = opcode;
             this.direction = direction;
@@ -150,7 +150,7 @@ public abstract class Message
     private int streamId;
     private Frame sourceFrame;
     private Map<String, ByteBuffer> customPayload;
-    protected Integer forcedProtocolVersion = null;
+    protected ProtocolVersion forcedProtocolVersion = null;
 
     protected Message(Type type)
     {
@@ -275,7 +275,7 @@ public abstract class Message
 
             try
             {
-                if (isCustomPayload && frame.header.version < Server.VERSION_4)
+                if (isCustomPayload && frame.header.version.isSmallerThan(ProtocolVersion.V4))
                     throw new ProtocolException("Received frame with CUSTOM_PAYLOAD flag for native protocol version < 4");
 
                 Message message = frame.header.type.codec.decode(frame.body, frame.header.version);
@@ -319,11 +319,8 @@ public abstract class Message
         {
             Connection connection = ctx.channel().attr(Connection.attributeKey).get();
             // The only case the connection can be null is when we send the initial STARTUP message (client side thus)
-            int version = connection == null ? Server.CURRENT_VERSION : connection.getVersion();
-
+            ProtocolVersion version = connection == null ? ProtocolVersion.CURRENT : connection.getVersion();
             EnumSet<Frame.Header.Flag> flags = EnumSet.noneOf(Frame.Header.Flag.class);
-            if (version == Server.BETA_VERSION)
-                flags.add(Frame.Header.Flag.USE_BETA);
 
             Codec<Message> codec = (Codec<Message>)message.type.codec;
             try
@@ -339,13 +336,13 @@ public abstract class Message
                     List<String> warnings = ((Response)message).getWarnings();
                     if (warnings != null)
                     {
-                        if (version < Server.VERSION_4)
+                        if (version.isSmallerThan(ProtocolVersion.V4))
                             throw new ProtocolException("Must not send frame with WARNING flag for native protocol version < 4");
                         messageSize += CBUtil.sizeOfStringList(warnings);
                     }
                     if (customPayload != null)
                     {
-                        if (version < Server.VERSION_4)
+                        if (version.isSmallerThan(ProtocolVersion.V4))
                             throw new ProtocolException("Must not send frame with CUSTOM_PAYLOAD flag for native protocol version < 4");
                         messageSize += CBUtil.sizeOfBytesMap(customPayload);
                     }
@@ -394,9 +391,13 @@ public abstract class Message
 
                 // if the driver attempted to connect with a protocol version lower than the minimum supported
                 // version, respond with a protocol error message with the correct frame header for that version
-                int responseVersion = message.forcedProtocolVersion == null
+                ProtocolVersion responseVersion = message.forcedProtocolVersion == null
                                     ? version
                                     : message.forcedProtocolVersion;
+
+                if (responseVersion.isBeta())
+                    flags.add(Frame.Header.Flag.USE_BETA);
+
                 results.add(Frame.create(message.type, message.getStreamId(), responseVersion, flags, body));
             }
             catch (Throwable e)
@@ -507,7 +508,7 @@ public abstract class Message
             {
                 assert request.connection() instanceof ServerConnection;
                 connection = (ServerConnection)request.connection();
-                if (connection.getVersion() >= Server.VERSION_4)
+                if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
                     ClientWarn.instance.captureWarnings();
 
                 QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
index 3a8b813..cdfadf6 100644
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -30,11 +30,11 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
 {
     public interface Codecable<T extends Enum<T>>
     {
-        public int getId(int version);
+        public int getId(ProtocolVersion version);
 
-        public Object readValue(ByteBuf cb, int version);
-        public void writeValue(Object value, ByteBuf cb, int version);
-        public int serializedValueSize(Object obj, int version);
+        public Object readValue(ByteBuf cb, ProtocolVersion version);
+        public void writeValue(Object value, ByteBuf cb, ProtocolVersion version);
+        public int serializedValueSize(Object obj, ProtocolVersion version);
     }
 
     private final Class<T> klass;
@@ -48,13 +48,13 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         T[] values = klass.getEnumConstants();
         int maxId = -1;
         for (T opt : values)
-            maxId = Math.max(maxId, opt.getId(Server.CURRENT_VERSION));
+            maxId = Math.max(maxId, opt.getId(ProtocolVersion.CURRENT));
         ids = (T[])Array.newInstance(klass, maxId + 1);
         for (T opt : values)
         {
-            if (ids[opt.getId(Server.CURRENT_VERSION)] != null)
-                throw new IllegalStateException(String.format("Duplicate option id %d", opt.getId(Server.CURRENT_VERSION)));
-            ids[opt.getId(Server.CURRENT_VERSION)] = opt;
+            if (ids[opt.getId(ProtocolVersion.CURRENT)] != null)
+                throw new IllegalStateException(String.format("Duplicate option id %d", opt.getId(ProtocolVersion.CURRENT)));
+            ids[opt.getId(ProtocolVersion.CURRENT)] = opt;
         }
     }
 
@@ -66,7 +66,7 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return opt;
     }
 
-    public Map<T, Object> decode(ByteBuf body, int version)
+    public Map<T, Object> decode(ByteBuf body, ProtocolVersion version)
     {
         EnumMap<T, Object> options = new EnumMap<T, Object>(klass);
         int n = body.readUnsignedShort();
@@ -81,7 +81,7 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return options;
     }
 
-    public ByteBuf encode(Map<T, Object> options, int version)
+    public ByteBuf encode(Map<T, Object> options, ProtocolVersion version)
     {
         int optLength = 2;
         for (Map.Entry<T, Object> entry : options.entrySet())
@@ -97,14 +97,14 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return cb;
     }
 
-    public Pair<T, Object> decodeOne(ByteBuf body, int version)
+    public Pair<T, Object> decodeOne(ByteBuf body, ProtocolVersion version)
     {
         T opt = fromId(body.readUnsignedShort());
         Object value = opt.readValue(body, version);
         return Pair.create(opt, value);
     }
 
-    public void writeOne(Pair<T, Object> option, ByteBuf dest, int version)
+    public void writeOne(Pair<T, Object> option, ByteBuf dest, ProtocolVersion version)
     {
         T opt = option.left;
         Object obj = option.right;
@@ -112,7 +112,7 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         opt.writeValue(obj, dest, version);
     }
 
-    public int oneSerializedSize(Pair<T, Object> option, int version)
+    public int oneSerializedSize(Pair<T, Object> option, ProtocolVersion version)
     {
         T opt = option.left;
         Object obj = option.right;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/ProtocolException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ProtocolException.java b/src/java/org/apache/cassandra/transport/ProtocolException.java
index 9d8c270..6ef17ac 100644
--- a/src/java/org/apache/cassandra/transport/ProtocolException.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolException.java
@@ -25,19 +25,17 @@ import org.apache.cassandra.exceptions.TransportException;
  */
 public class ProtocolException extends RuntimeException implements TransportException
 {
-    private final Integer attemptedLowProtocolVersion;
+    private final ProtocolVersion forcedProtocolVersion;
 
     public ProtocolException(String msg)
     {
         this(msg, null);
     }
 
-    public ProtocolException(String msg, Integer attemptedLowProtocolVersion)
+    public ProtocolException(String msg, ProtocolVersion forcedProtocolVersion)
     {
         super(msg);
-        assert attemptedLowProtocolVersion == null || attemptedLowProtocolVersion < Server.MIN_SUPPORTED_VERSION;
-
-        this.attemptedLowProtocolVersion = attemptedLowProtocolVersion;
+        this.forcedProtocolVersion = forcedProtocolVersion;
     }
 
     public ExceptionCode code()
@@ -45,13 +43,8 @@ public class ProtocolException extends RuntimeException implements TransportExce
         return ExceptionCode.PROTOCOL_ERROR;
     }
 
-    /**
-     * If the ProtocolException is due to a connection being made with a protocol version that is lower
-     * than Server.MIN_SUPPORTED_VERSION, this will return that unsupported protocol version.  Otherwise,
-     * null is returned.
-     */
-    public Integer getAttemptedLowProtocolVersion()
+    public ProtocolVersion getForcedProtocolVersion()
     {
-        return attemptedLowProtocolVersion;
+        return forcedProtocolVersion;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/ProtocolVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
new file mode 100644
index 0000000..cd73c86
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+/**
+ * The native (CQL binary) protocol version.
+ *
+ * Some versions may be in beta, which means that the client must
+ * specify the beta flag in the frame for the version to be considered valid.
+ * Beta versions must have the word "beta" in their description, this is mandated
+ * by the specs.
+ *
+ */
+public enum ProtocolVersion implements Comparable<ProtocolVersion>
+{
+    // The order is important as it defines the chronological history of versions, which is used
+    // to determine if a feature is supported or some serdes formats
+    V1(1, "v1", false), // no longer supported
+    V2(2, "v2", false), // no longer supported
+    V3(3, "v3", false),
+    V4(4, "v4", false),
+    V5(5, "v5-beta", true);
+
+    /** The version number */
+    private final int num;
+
+    /** A description of the version, beta versions should have the word "-beta" */
+    private final String descr;
+
+    /** Set this to true for beta versions */
+    private final boolean beta;
+
+    ProtocolVersion(int num, String descr, boolean beta)
+    {
+        this.num = num;
+        this.descr = descr;
+        this.beta = beta;
+    }
+
+    /** The supported versions stored as an array, these should be private and are required for fast decoding*/
+    private final static ProtocolVersion[] SUPPORTED_VERSIONS = new ProtocolVersion[] { V3, V4, V5 };
+    final static ProtocolVersion MIN_SUPPORTED_VERSION = SUPPORTED_VERSIONS[0];
+    final static ProtocolVersion MAX_SUPPORTED_VERSION = SUPPORTED_VERSIONS[SUPPORTED_VERSIONS.length - 1];
+
+    /** All supported versions, published as an enumset */
+    public final static EnumSet<ProtocolVersion> SUPPORTED = EnumSet.copyOf(Arrays.asList((ProtocolVersion[]) ArrayUtils.addAll(SUPPORTED_VERSIONS)));
+
+    /** Old unsupported versions, this is OK as long as we never add newer unsupported versions */
+    public final static EnumSet<ProtocolVersion> UNSUPPORTED = EnumSet.complementOf(SUPPORTED);
+
+    /** The preferred versions */
+    public final static ProtocolVersion CURRENT = V4;
+    public final static Optional<ProtocolVersion> BETA = Optional.of(V5);
+
+    public static List<String> supportedVersions()
+    {
+        List<String> ret = new ArrayList<>(SUPPORTED.size());
+        for (ProtocolVersion version : SUPPORTED)
+            ret.add(version.toString());
+        return ret;
+    }
+
+    public static ProtocolVersion decode(int versionNum)
+    {
+        ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && versionNum <= MAX_SUPPORTED_VERSION.num
+                              ? SUPPORTED_VERSIONS[versionNum - MIN_SUPPORTED_VERSION.num]
+                              : null;
+
+        if (ret == null)
+        {
+            // if this is not a supported version check the old versions
+            for (ProtocolVersion version : UNSUPPORTED)
+            {
+                // if it is an old version that is no longer supported this ensures that we reply
+                // with that same version
+                if (version.num == versionNum)
+                    throw new ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum), version);
+            }
+
+            // If the version is invalid reply with the highest version that we support
+            throw new ProtocolException(invalidVersionMessage(versionNum), MAX_SUPPORTED_VERSION);
+        }
+
+        return ret;
+    }
+
+    public boolean isBeta()
+    {
+        return beta;
+    }
+
+    public static String invalidVersionMessage(int version)
+    {
+        return String.format("Invalid or unsupported protocol version (%d); supported versions are (%s)",
+                             version, String.join(", ", ProtocolVersion.supportedVersions()));
+    }
+
+    public int asInt()
+    {
+        return num;
+    }
+
+    @Override
+    public String toString()
+    {
+        // This format is mandated by the protocl specs for the SUPPORTED message, see OptionsMessage execute().
+        return String.format("%d/%s", num, descr);
+    }
+
+    public final boolean isGreaterThan(ProtocolVersion other)
+    {
+        return num > other.num;
+    }
+
+    public final boolean isGreaterOrEqualTo(ProtocolVersion other)
+    {
+        return num >= other.num;
+    }
+
+    public final boolean isSmallerThan(ProtocolVersion other)
+    {
+        return num < other.num;
+    }
+
+    public final boolean isSmallerOrEqualTo(ProtocolVersion other)
+    {
+        return num <= other.num;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 267d532..1eeecac 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -64,18 +64,11 @@ public class Server implements CassandraDaemon.Server
     private static final Logger logger = LoggerFactory.getLogger(Server.class);
     private static final boolean useEpoll = NativeTransportService.useEpoll();
 
-    public static final int VERSION_3 = 3;
-    public static final int VERSION_4 = 4;
-    public static final int VERSION_5 = 5;
-    public static final int CURRENT_VERSION = VERSION_4;
-    public static final int BETA_VERSION = VERSION_5;
-    public static final int MIN_SUPPORTED_VERSION = VERSION_3;
-
     private final ConnectionTracker connectionTracker = new ConnectionTracker();
 
     private final Connection.Factory connectionFactory = new Connection.Factory()
     {
-        public Connection newConnection(Channel channel, int version)
+        public Connection newConnection(Channel channel, ProtocolVersion version)
         {
             return new ServerConnection(channel, version, connectionTracker);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 1ef6c73..9374ca0 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -36,7 +36,7 @@ public class ServerConnection extends Connection
 
     private final ConcurrentMap<Integer, QueryState> queryStates = new ConcurrentHashMap<>();
 
-    public ServerConnection(Channel channel, int version, Connection.Tracker tracker)
+    public ServerConnection(Channel channel, ProtocolVersion version, Connection.Tracker tracker)
     {
         super(channel, version, tracker);
         this.clientState = ClientState.forExternalCalls(channel.remoteAddress());
@@ -56,7 +56,7 @@ public class ServerConnection extends Connection
         return qState;
     }
 
-    public QueryState validateNewMessage(Message.Type type, int version, int streamId)
+    public QueryState validateNewMessage(Message.Type type, ProtocolVersion version, int streamId)
     {
         switch (state)
         {
@@ -67,7 +67,7 @@ public class ServerConnection extends Connection
             case AUTHENTICATION:
                 // Support both SASL auth from protocol v2 and the older style Credentials auth from v1
                 if (type != Message.Type.AUTH_RESPONSE && type != Message.Type.CREDENTIALS)
-                    throw new ProtocolException(String.format("Unexpected message %s, expecting %s", type, version == 1 ? "CREDENTIALS" : "SASL_RESPONSE"));
+                    throw new ProtocolException(String.format("Unexpected message %s, expecting %s", type, version == ProtocolVersion.V1 ? "CREDENTIALS" : "SASL_RESPONSE"));
                 break;
             case READY:
                 if (type == Message.Type.STARTUP)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 4d8a30b..1bb081b 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -75,7 +75,7 @@ public class SimpleClient implements Closeable
 
     protected final ResponseHandler responseHandler = new ResponseHandler();
     protected final Connection.Tracker tracker = new ConnectionTracker();
-    protected final int version;
+    protected final ProtocolVersion version;
     // We don't track connection really, so we don't need one Connection per channel
     protected Connection connection;
     protected Bootstrap bootstrap;
@@ -84,32 +84,32 @@ public class SimpleClient implements Closeable
 
     private final Connection.Factory connectionFactory = new Connection.Factory()
     {
-        public Connection newConnection(Channel channel, int version)
+        public Connection newConnection(Channel channel, ProtocolVersion version)
         {
             return connection;
         }
     };
 
-    public SimpleClient(String host, int port, int version, ClientEncryptionOptions encryptionOptions)
+    public SimpleClient(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions)
     {
         this(host, port, version, false, encryptionOptions);
     }
 
     public SimpleClient(String host, int port, ClientEncryptionOptions encryptionOptions)
     {
-        this(host, port, Server.CURRENT_VERSION, encryptionOptions);
+        this(host, port, ProtocolVersion.CURRENT, encryptionOptions);
     }
 
-    public SimpleClient(String host, int port, int version)
+    public SimpleClient(String host, int port, ProtocolVersion version)
     {
         this(host, port, version, new ClientEncryptionOptions());
     }
 
-    public SimpleClient(String host, int port, int version, boolean useBeta, ClientEncryptionOptions encryptionOptions)
+    public SimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, ClientEncryptionOptions encryptionOptions)
     {
         this.host = host;
         this.port = port;
-        if (version == Server.BETA_VERSION && !useBeta)
+        if (version.isBeta() && !useBeta)
             throw new IllegalArgumentException(String.format("Beta version of server used (%s), but USE_BETA flag is not set", version));
 
         this.version = version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index 15a9a9a..fda83f9 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.transport.messages;
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
 import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 import java.nio.ByteBuffer;
 
@@ -30,7 +31,7 @@ public class AuthChallenge extends Message.Response
 {
     public static final Message.Codec<AuthChallenge> codec = new Message.Codec<AuthChallenge>()
     {
-        public AuthChallenge decode(ByteBuf body, int version)
+        public AuthChallenge decode(ByteBuf body, ProtocolVersion version)
         {
             ByteBuffer b = CBUtil.readValue(body);
             byte[] token = new byte[b.remaining()];
@@ -38,12 +39,12 @@ public class AuthChallenge extends Message.Response
             return new AuthChallenge(token);
         }
 
-        public void encode(AuthChallenge challenge, ByteBuf dest, int version)
+        public void encode(AuthChallenge challenge, ByteBuf dest, ProtocolVersion version)
         {
             CBUtil.writeValue(challenge.token, dest);
         }
 
-        public int encodedSize(AuthChallenge challenge, int version)
+        public int encodedSize(AuthChallenge challenge, ProtocolVersion version)
         {
             return CBUtil.sizeOfValue(challenge.token);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index e90f740..332b024 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -36,9 +36,9 @@ public class AuthResponse extends Message.Request
 {
     public static final Message.Codec<AuthResponse> codec = new Message.Codec<AuthResponse>()
     {
-        public AuthResponse decode(ByteBuf body, int version)
+        public AuthResponse decode(ByteBuf body, ProtocolVersion version)
         {
-            if (version == 1)
+            if (version == ProtocolVersion.V1)
                 throw new ProtocolException("SASL Authentication is not supported in version 1 of the protocol");
 
             ByteBuffer b = CBUtil.readValue(body);
@@ -47,12 +47,12 @@ public class AuthResponse extends Message.Request
             return new AuthResponse(token);
         }
 
-        public void encode(AuthResponse response, ByteBuf dest, int version)
+        public void encode(AuthResponse response, ByteBuf dest, ProtocolVersion version)
         {
             CBUtil.writeValue(response.token, dest);
         }
 
-        public int encodedSize(AuthResponse response, int version)
+        public int encodedSize(AuthResponse response, ProtocolVersion version)
         {
             return CBUtil.sizeOfValue(response.token);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index 8c1b5b1..b8ed7f0 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.transport.messages;
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
 import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 import java.nio.ByteBuffer;
 
@@ -33,7 +34,7 @@ public class AuthSuccess extends Message.Response
 {
     public static final Message.Codec<AuthSuccess> codec = new Message.Codec<AuthSuccess>()
     {
-        public AuthSuccess decode(ByteBuf body, int version)
+        public AuthSuccess decode(ByteBuf body, ProtocolVersion version)
         {
             ByteBuffer b = CBUtil.readValue(body);
             byte[] token = null;
@@ -45,12 +46,12 @@ public class AuthSuccess extends Message.Response
             return new AuthSuccess(token);
         }
 
-        public void encode(AuthSuccess success, ByteBuf dest, int version)
+        public void encode(AuthSuccess success, ByteBuf dest, ProtocolVersion version)
         {
             CBUtil.writeValue(success.token, dest);
         }
 
-        public int encodedSize(AuthSuccess success, int version)
+        public int encodedSize(AuthSuccess success, ProtocolVersion version)
         {
             return CBUtil.sizeOfValue(success.token);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index 230f0f2..1261083 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 /**
  * Message to indicate that the server is ready to receive requests.
@@ -29,18 +30,18 @@ public class AuthenticateMessage extends Message.Response
 {
     public static final Message.Codec<AuthenticateMessage> codec = new Message.Codec<AuthenticateMessage>()
     {
-        public AuthenticateMessage decode(ByteBuf body, int version)
+        public AuthenticateMessage decode(ByteBuf body, ProtocolVersion version)
         {
             String authenticator = CBUtil.readString(body);
             return new AuthenticateMessage(authenticator);
         }
 
-        public void encode(AuthenticateMessage msg, ByteBuf dest, int version)
+        public void encode(AuthenticateMessage msg, ByteBuf dest, ProtocolVersion version)
         {
             CBUtil.writeString(msg.authenticator, dest);
         }
 
-        public int encodedSize(AuthenticateMessage msg, int version)
+        public int encodedSize(AuthenticateMessage msg, ProtocolVersion version)
         {
             return CBUtil.sizeOfString(msg.authenticator);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0adc166/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 6675565..bb6411f 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -43,7 +43,7 @@ public class BatchMessage extends Message.Request
 {
     public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>()
     {
-        public BatchMessage decode(ByteBuf body, int version)
+        public BatchMessage decode(ByteBuf body, ProtocolVersion version)
         {
             byte type = body.readByte();
             int n = body.readUnsignedShort();
@@ -65,7 +65,7 @@ public class BatchMessage extends Message.Request
             return new BatchMessage(toType(type), queryOrIds, variables, options);
         }
 
-        public void encode(BatchMessage msg, ByteBuf dest, int version)
+        public void encode(BatchMessage msg, ByteBuf dest, ProtocolVersion version)
         {
             int queries = msg.queryOrIdList.size();
 
@@ -84,13 +84,13 @@ public class BatchMessage extends Message.Request
                 CBUtil.writeValueList(msg.values.get(i), dest);
             }
 
-            if (version < Server.VERSION_3)
+            if (version.isSmallerThan(ProtocolVersion.V3))
                 CBUtil.writeConsistencyLevel(msg.options.getConsistency(), dest);
             else
                 QueryOptions.codec.encode(msg.options, dest, version);
         }
 
-        public int encodedSize(BatchMessage msg, int version)
+        public int encodedSize(BatchMessage msg, ProtocolVersion version)
         {
             int size = 3; // type + nb queries
             for (int i = 0; i < msg.queryOrIdList.size(); i++)
@@ -102,7 +102,7 @@ public class BatchMessage extends Message.Request
 
                 size += CBUtil.sizeOfValueList(msg.values.get(i));
             }
-            size += version < Server.VERSION_3
+            size += version.isSmallerThan(ProtocolVersion.V3)
                   ? CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency())
                   : QueryOptions.codec.encodedSize(msg.options, version);
             return size;


Mime
View raw message